Skip to content

Commit 1785bd4

Browse files
committed
Add failure variant for GetState, PollInputStream and SleepEntryMessage
This commit updates the SDK to the latest service protocol version. Part of it is to enable failure variants for the GetState, PollInputStream and SleepEntryMessages so that the runtime can cancel these entries. This commit also adds tests for verifying the changes. This fixes #210.
1 parent a687a91 commit 1785bd4

File tree

9 files changed

+193
-35
lines changed

9 files changed

+193
-35
lines changed

proto/protocol.proto

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -96,13 +96,18 @@ message ErrorMessage {
9696

9797
// ------ Input and output ------
9898

99-
// Kind: Completable JournalEntry
99+
// Completable: Yes
100+
// Fallible: No
100101
// Type: 0x0400 + 0
101102
message PollInputStreamEntryMessage {
102-
bytes value = 14;
103+
oneof result {
104+
bytes value = 14;
105+
Failure failure = 15;
106+
}
103107
}
104108

105-
// Kind: Non-Completable JournalEntry
109+
// Completable: No
110+
// Fallible: No
106111
// Type: 0x0400 + 1
107112
message OutputStreamEntryMessage {
108113
oneof result {
@@ -113,43 +118,52 @@ message OutputStreamEntryMessage {
113118

114119
// ------ State access ------
115120

116-
// Kind: Completable JournalEntry
121+
// Completable: Yes
122+
// Fallible: No
117123
// Type: 0x0800 + 0
118124
message GetStateEntryMessage {
119125
bytes key = 1;
120126

121127
oneof result {
122128
google.protobuf.Empty empty = 13;
123129
bytes value = 14;
130+
Failure failure = 15;
124131
};
125132
}
126133

127-
// Kind: Non-Completable JournalEntry
134+
// Completable: No
135+
// Fallible: No
128136
// Type: 0x0800 + 1
129137
message SetStateEntryMessage {
130138
bytes key = 1;
131139
bytes value = 3;
132140
}
133141

134-
// Kind: Non-Completable JournalEntry
142+
// Completable: No
143+
// Fallible: No
135144
// Type: 0x0800 + 2
136145
message ClearStateEntryMessage {
137146
bytes key = 1;
138147
}
139148

140149
// ------ Syscalls ------
141150

142-
// Kind: Completable JournalEntry
151+
// Completable: Yes
152+
// Fallible: No
143153
// Type: 0x0C00 + 0
144154
message SleepEntryMessage {
145155
// Wake up time.
146156
// The time is set as duration since UNIX Epoch.
147157
uint64 wake_up_time = 1;
148158

149-
google.protobuf.Empty result = 13;
159+
oneof result {
160+
google.protobuf.Empty empty = 13;
161+
Failure failure = 15;
162+
}
150163
}
151164

152-
// Kind: Completable JournalEntry
165+
// Completable: Yes
166+
// Fallible: Yes
153167
// Type: 0x0C00 + 1
154168
message InvokeEntryMessage {
155169
string service_name = 1;
@@ -163,7 +177,8 @@ message InvokeEntryMessage {
163177
};
164178
}
165179

166-
// Kind: Non-Completable JournalEntry
180+
// Completable: No
181+
// Fallible: Yes
167182
// Type: 0x0C00 + 2
168183
message BackgroundInvokeEntryMessage {
169184
string service_name = 1;
@@ -178,7 +193,8 @@ message BackgroundInvokeEntryMessage {
178193
uint64 invoke_time = 4;
179194
}
180195

181-
// Kind: Completable JournalEntry
196+
// Completable: Yes
197+
// Fallible: No
182198
// Type: 0x0C00 + 3
183199
// Awakeables are addressed by an identifier exposed to the user. See the spec for more details.
184200
message AwakeableEntryMessage {
@@ -188,7 +204,8 @@ message AwakeableEntryMessage {
188204
};
189205
}
190206

191-
// Kind: Non-Completable JournalEntry
207+
// Completable: No
208+
// Fallible: Yes
192209
// Type: 0x0C00 + 4
193210
message CompleteAwakeableEntryMessage {
194211
// Identifier of the awakeable. See the spec for more details.

src/invocation.ts

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
import { Message } from "./types/types";
1515
import { HostedGrpcServiceMethod } from "./types/grpc";
1616
import {
17+
Failure,
1718
PollInputStreamEntryMessage,
1819
StartMessage,
1920
} from "./generated/proto/protocol";
@@ -37,6 +38,10 @@ enum State {
3738
Complete = 3,
3839
}
3940

41+
type InvocationValue =
42+
| { kind: "value"; value: Buffer }
43+
| { kind: "failure"; failure: Failure };
44+
4045
export class InvocationBuilder<I, O> implements RestateStreamConsumer {
4146
private readonly complete = new CompletablePromise<void>();
4247

@@ -46,7 +51,7 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {
4651
private replayEntries = new Map<number, Message>();
4752
private id?: Buffer = undefined;
4853
private debugId?: string = undefined;
49-
private invocationValue?: Buffer = undefined;
54+
private invocationValue?: InvocationValue = undefined;
5055
private nbEntriesToReplay?: number = undefined;
5156
private localStateStore?: LocalStateStore;
5257

@@ -67,6 +72,8 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {
6772
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
6873
m
6974
);
75+
76+
this.handlePollInputStreamEntry(m);
7077
this.addReplayEntry(m);
7178
break;
7279

@@ -100,6 +107,28 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {
100107
}
101108
}
102109

110+
private handlePollInputStreamEntry(m: Message) {
111+
const pollInputStreamMessage = m.message as PollInputStreamEntryMessage;
112+
113+
if (pollInputStreamMessage.value !== undefined) {
114+
this.invocationValue = {
115+
kind: "value",
116+
value: pollInputStreamMessage.value,
117+
};
118+
} else if (pollInputStreamMessage.failure !== undefined) {
119+
this.invocationValue = {
120+
kind: "failure",
121+
failure: pollInputStreamMessage.failure,
122+
};
123+
} else {
124+
throw new Error(
125+
`PollInputStreamEntry neither contains value nor failure: ${printMessageAsJson(
126+
m
127+
)}`
128+
);
129+
}
130+
}
131+
103132
public handleStreamError(e: Error): void {
104133
this.complete.reject(e);
105134
}
@@ -120,10 +149,6 @@ export class InvocationBuilder<I, O> implements RestateStreamConsumer {
120149
}
121150

122151
private addReplayEntry(m: Message): InvocationBuilder<I, O> {
123-
if (m.messageType === POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE) {
124-
this.invocationValue = (m.message as PollInputStreamEntryMessage).value;
125-
}
126-
127152
// Will be retrieved when the user code reaches this point
128153
this.replayEntries.set(this.runtimeReplayIndex, m);
129154
this.incrementRuntimeReplayIndex();
@@ -164,7 +189,7 @@ export class Invocation<I, O> {
164189
public readonly debugId: string,
165190
public readonly nbEntriesToReplay: number,
166191
public readonly replayEntries: Map<number, Message>,
167-
public readonly invocationValue: Buffer,
192+
public readonly invocationValue: InvocationValue,
168193
public readonly localStateStore: LocalStateStore
169194
) {
170195
this.logPrefix = `[${makeFqServiceName(

src/journal.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,8 @@ export class Journal<I, O> {
260260
this.resolveResult(
261261
journalIndex,
262262
journalEntry,
263-
getStateMsg.value || getStateMsg.empty
263+
getStateMsg.value || getStateMsg.empty,
264+
getStateMsg.failure
264265
);
265266
break;
266267
}
@@ -276,7 +277,12 @@ export class Journal<I, O> {
276277
}
277278
case SLEEP_ENTRY_MESSAGE_TYPE: {
278279
const sleepMsg = replayMessage.message as SleepEntryMessage;
279-
this.resolveResult(journalIndex, journalEntry, sleepMsg.result);
280+
this.resolveResult(
281+
journalIndex,
282+
journalEntry,
283+
sleepMsg.empty,
284+
sleepMsg.failure
285+
);
280286
break;
281287
}
282288
case AWAKEABLE_ENTRY_MESSAGE_TYPE: {

src/state_machine.ts

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import {
3333
TerminalError,
3434
RetryableError,
3535
errorToErrorMessage,
36+
failureToTerminalError,
3637
} from "./types/errors";
3738
import { LocalStateStore } from "./local_state_store";
3839

@@ -211,10 +212,21 @@ export class StateMachine<I, O> implements RestateStreamConsumer {
211212
rlog.debugInvokeMessage(this.invocation.logPrefix, "Invoking function.");
212213
}
213214

214-
const resultBytes: Promise<Uint8Array> = this.invocation.method.invoke(
215-
this.restateContext,
216-
this.invocation.invocationValue
217-
);
215+
let resultBytes: Promise<Uint8Array>;
216+
217+
switch (this.invocation.invocationValue.kind) {
218+
case "value":
219+
resultBytes = this.invocation.method.invoke(
220+
this.restateContext,
221+
this.invocation.invocationValue.value
222+
);
223+
break;
224+
case "failure":
225+
resultBytes = Promise.reject(
226+
failureToTerminalError(this.invocation.invocationValue.failure)
227+
);
228+
break;
229+
}
218230

219231
resultBytes
220232
.then((bytes) => {

src/types/errors.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,10 @@ export function errorToFailureWithTerminal(err: Error): FailureWithTerminal {
260260
});
261261
}
262262

263+
export function failureToTerminalError(failure: Failure): TerminalError {
264+
return failureToError(failure, true) as TerminalError;
265+
}
266+
263267
export function failureToError(
264268
failure: Failure,
265269
terminalError: boolean

test/get_state.test.ts

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,9 @@ import * as restate from "../src/public_api";
1414
import { TestDriver } from "./testdriver";
1515
import {
1616
checkJournalMismatchError,
17+
checkTerminalError,
1718
completionMessage,
19+
failure,
1820
getStateMessage,
1921
greetRequest,
2022
greetResponse,
@@ -106,6 +108,18 @@ describe("GetStringStateGreeter", () => {
106108
]);
107109
});
108110

111+
it("handles completion with failure", async () => {
112+
const result = await new TestDriver(new GetStringStateGreeter(), [
113+
startMessage(),
114+
inputMessage(greetRequest("Till")),
115+
completionMessage(1, undefined, undefined, failure("Canceled")),
116+
]).run();
117+
118+
expect(result.length).toStrictEqual(2);
119+
expect(result[0]).toStrictEqual(getStateMessage("STATE"));
120+
checkTerminalError(result[1], "Canceled");
121+
});
122+
109123
it("handles replay with value", async () => {
110124
const result = await new TestDriver(new GetStringStateGreeter(), [
111125
startMessage(),
@@ -129,6 +143,17 @@ describe("GetStringStateGreeter", () => {
129143
outputMessage(greetResponse("Hello nobody")),
130144
]);
131145
});
146+
147+
it("handles replay with failure", async () => {
148+
const result = await new TestDriver(new GetStringStateGreeter(), [
149+
startMessage(),
150+
inputMessage(greetRequest("Till")),
151+
getStateMessage("STATE", undefined, undefined, failure("Canceled")),
152+
]).run();
153+
154+
expect(result.length).toStrictEqual(1);
155+
checkTerminalError(result[0], "Canceled");
156+
});
132157
});
133158

134159
class GetNumberStateGreeter implements TestGreeter {

test/protoutils.ts

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,24 @@ export function toStateEntries(entries: Buffer[][]) {
9090
);
9191
}
9292

93-
export function inputMessage(value: Uint8Array): Message {
94-
return new Message(
95-
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
96-
PollInputStreamEntryMessage.create({
97-
value: Buffer.from(value),
98-
})
99-
);
93+
export function inputMessage(value?: Uint8Array, failure?: Failure): Message {
94+
if (failure !== undefined) {
95+
return new Message(
96+
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
97+
PollInputStreamEntryMessage.create({
98+
failure: failure,
99+
})
100+
);
101+
} else if (value !== undefined) {
102+
return new Message(
103+
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
104+
PollInputStreamEntryMessage.create({
105+
value: Buffer.from(value),
106+
})
107+
);
108+
} else {
109+
throw new Error("Input message needs either a value or a failure set.");
110+
}
100111
}
101112

102113
export function outputMessage(value?: Uint8Array, failure?: Failure): Message {
@@ -130,7 +141,8 @@ export function outputMessage(value?: Uint8Array, failure?: Failure): Message {
130141
export function getStateMessage<T>(
131142
key: string,
132143
value?: T,
133-
empty?: boolean
144+
empty?: boolean,
145+
failure?: Failure
134146
): Message {
135147
if (empty === true) {
136148
return new Message(
@@ -148,6 +160,14 @@ export function getStateMessage<T>(
148160
value: Buffer.from(jsonSerialize(value)),
149161
})
150162
);
163+
} else if (failure !== undefined) {
164+
return new Message(
165+
GET_STATE_ENTRY_MESSAGE_TYPE,
166+
GetStateEntryMessage.create({
167+
key: Buffer.from(key),
168+
failure: failure,
169+
})
170+
);
151171
} else {
152172
return new Message(
153173
GET_STATE_ENTRY_MESSAGE_TYPE,
@@ -177,13 +197,25 @@ export function clearStateMessage(key: string): Message {
177197
);
178198
}
179199

180-
export function sleepMessage(wakeupTime: number, result?: Empty): Message {
181-
if (result !== undefined) {
200+
export function sleepMessage(
201+
wakeupTime: number,
202+
empty?: Empty,
203+
failure?: Failure
204+
): Message {
205+
if (empty !== undefined) {
182206
return new Message(
183207
SLEEP_ENTRY_MESSAGE_TYPE,
184208
SleepEntryMessage.create({
185209
wakeUpTime: wakeupTime,
186-
result: result,
210+
empty: empty,
211+
})
212+
);
213+
} else if (failure !== undefined) {
214+
return new Message(
215+
SLEEP_ENTRY_MESSAGE_TYPE,
216+
SleepEntryMessage.create({
217+
wakeUpTime: wakeupTime,
218+
failure: failure,
187219
})
188220
);
189221
} else {

0 commit comments

Comments
 (0)