Skip to content

Commit ae50697

Browse files
Use the new REQUIRES_ACK flag and the EntryAckMessage (#195)
1 parent a18b44c commit ae50697

File tree

8 files changed

+128
-69
lines changed

8 files changed

+128
-69
lines changed

proto/protocol.proto

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
1-
/*
2-
* Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
3-
*
4-
* This file is part of the Restate SDK for Node.js/TypeScript,
5-
* which is released under the MIT license.
6-
*
7-
* You can find a copy of the license in file LICENSE in the root
8-
* directory of this repository or package, or at
9-
* https://github.com/restatedev/sdk-typescript/blob/main/LICENSE
10-
*/
1+
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
2+
//
3+
// This file is part of the Restate service protocol, which is
4+
// released under the MIT license.
5+
//
6+
// You can find a copy of the license in file LICENSE in the root
7+
// directory of this repository or package, or at
8+
// https://github.com/restatedev/service-protocol/blob/main/LICENSE
119

1210
syntax = "proto3";
1311

@@ -44,7 +42,6 @@ message StartMessage {
4442
}
4543

4644
// Type: 0x0000 + 1
47-
// Note: Acks to custom messages will have the `empty` field filled.
4845
message CompletionMessage {
4946
uint32 entry_index = 1;
5047

@@ -55,6 +52,11 @@ message CompletionMessage {
5552
};
5653
}
5754

55+
// Type: 0x0000 + 4
56+
message EntryAckMessage {
57+
uint32 entry_index = 1;
58+
}
59+
5860
// Type: 0x0000 + 2
5961
// Implementations MUST send this message when suspending an invocation.
6062
message SuspensionMessage {

src/journal.ts

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import {
1818
CLEAR_STATE_ENTRY_MESSAGE_TYPE,
1919
COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE,
2020
CompletionMessage,
21+
EntryAckMessage,
2122
GET_STATE_ENTRY_MESSAGE_TYPE,
2223
GetStateEntryMessage,
2324
INVOKE_ENTRY_MESSAGE_TYPE,
@@ -191,16 +192,23 @@ export class Journal<I, O> {
191192
journalEntry.resolve(m.empty);
192193
this.pendingJournalEntries.delete(m.entryIndex);
193194
} else {
194-
if (journalEntry.messageType === p.SIDE_EFFECT_ENTRY_MESSAGE_TYPE) {
195-
// Just needs and ack without completion
196-
journalEntry.resolve(undefined);
197-
this.pendingJournalEntries.delete(m.entryIndex);
198-
} else {
199-
//TODO completion message without a value/failure/empty and message is not a side effect
200-
}
195+
//TODO completion message without a value/failure/empty
201196
}
202197
}
203198

199+
public handleEntryAckMessage(m: EntryAckMessage) {
200+
// Get message at that entryIndex in pendingJournalEntries
201+
const journalEntry = this.pendingJournalEntries.get(m.entryIndex);
202+
203+
if (journalEntry === undefined) {
204+
return;
205+
}
206+
207+
// Just needs an ack
208+
journalEntry.resolve(undefined);
209+
this.pendingJournalEntries.delete(m.entryIndex);
210+
}
211+
204212
private handleReplay(
205213
journalIndex: number,
206214
replayMessage: Message,

src/state_machine.ts

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import { rlog } from "./utils/logger";
1919
import { clearTimeout } from "timers";
2020
import {
2121
COMPLETION_MESSAGE_TYPE,
22+
ENTRY_ACK_MESSAGE_TYPE,
2223
ERROR_MESSAGE_TYPE,
2324
OUTPUT_STREAM_ENTRY_MESSAGE_TYPE,
2425
OutputStreamEntryMessage,
@@ -80,21 +81,30 @@ export class StateMachine<I, O> implements RestateStreamConsumer {
8081
return false;
8182
}
8283

83-
if (m.messageType !== COMPLETION_MESSAGE_TYPE) {
84+
if (m.messageType === COMPLETION_MESSAGE_TYPE) {
85+
rlog.debugJournalMessage(
86+
this.invocation.logPrefix,
87+
"Received completion message from Restate, adding to journal.",
88+
m.messageType,
89+
m.message
90+
);
91+
this.journal.handleRuntimeCompletionMessage(
92+
m.message as p.CompletionMessage
93+
);
94+
} else if (m.messageType === ENTRY_ACK_MESSAGE_TYPE) {
95+
rlog.debugJournalMessage(
96+
this.invocation.logPrefix,
97+
"Received entry ack message from Restate, adding to journal.",
98+
m.messageType,
99+
m.message
100+
);
101+
this.journal.handleEntryAckMessage(m.message as p.EntryAckMessage);
102+
} else {
84103
throw RetryableError.protocolViolation(
85-
`Received message of type ${m.messageType}. Can only accept completion messages after replay has finished.`
104+
`Received message of type ${m.messageType}. Can only accept completion or acks messages after replay has finished.`
86105
);
87106
}
88107

89-
rlog.debugJournalMessage(
90-
this.invocation.logPrefix,
91-
"Received completion message from Restate, adding to journal.",
92-
m.messageType,
93-
m.message
94-
);
95-
this.journal.handleRuntimeCompletionMessage(
96-
m.message as p.CompletionMessage
97-
);
98108
// Remove lingering suspension timeouts, if we are not waiting for completions anymore
99109
if (
100110
this.suspensionTimeout !== undefined &&

src/types/protocol.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import {
1616
ClearStateEntryMessage,
1717
CompleteAwakeableEntryMessage,
1818
CompletionMessage,
19+
EntryAckMessage,
1920
ErrorMessage,
2021
GetStateEntryMessage,
2122
InvokeEntryMessage,
@@ -43,13 +44,15 @@ export {
4344
SleepEntryMessage,
4445
StartMessage,
4546
SuspensionMessage,
47+
EntryAckMessage,
4648
} from "../generated/proto/protocol";
4749

4850
// Export the protocol message types as defined by the restate protocol.
4951
export const START_MESSAGE_TYPE = 0x0000n;
5052
export const COMPLETION_MESSAGE_TYPE = 0x0001n;
5153
export const SUSPENSION_MESSAGE_TYPE = 0x0002n;
5254
export const ERROR_MESSAGE_TYPE = 0x0003n;
55+
export const ENTRY_ACK_MESSAGE_TYPE = 0x0004n;
5356
export const POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE = 0x0400n;
5457
export const OUTPUT_STREAM_ENTRY_MESSAGE_TYPE = 0x0401n;
5558
export const GET_STATE_ENTRY_MESSAGE_TYPE = 0x0800n;
@@ -74,6 +77,7 @@ export const KNOWN_MESSAGE_TYPES = new Set([
7477
COMPLETION_MESSAGE_TYPE,
7578
SUSPENSION_MESSAGE_TYPE,
7679
ERROR_MESSAGE_TYPE,
80+
ENTRY_ACK_MESSAGE_TYPE,
7781
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
7882
OUTPUT_STREAM_ENTRY_MESSAGE_TYPE,
7983
GET_STATE_ENTRY_MESSAGE_TYPE,
@@ -92,6 +96,7 @@ export const PROTOBUF_MESSAGE_NAME_BY_TYPE = new Map<bigint, string>([
9296
[COMPLETION_MESSAGE_TYPE, "CompletionMessage"],
9397
[SUSPENSION_MESSAGE_TYPE, "SuspensionMessage"],
9498
[ERROR_MESSAGE_TYPE, "ErrorMessage"],
99+
[ENTRY_ACK_MESSAGE_TYPE, "EntryAckMessage"],
95100
[POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE, "PollInputStreamEntryMessage"],
96101
[OUTPUT_STREAM_ENTRY_MESSAGE_TYPE, "OutputStreamEntryMessage"],
97102
[GET_STATE_ENTRY_MESSAGE_TYPE, "GetStateEntryMessage"],
@@ -111,6 +116,7 @@ const PROTOBUF_MESSAGES: Array<[bigint, any]> = [
111116
[COMPLETION_MESSAGE_TYPE, CompletionMessage],
112117
[SUSPENSION_MESSAGE_TYPE, SuspensionMessage],
113118
[ERROR_MESSAGE_TYPE, ErrorMessage],
119+
[ENTRY_ACK_MESSAGE_TYPE, EntryAckMessage],
114120
[POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE, PollInputStreamEntryMessage],
115121
[OUTPUT_STREAM_ENTRY_MESSAGE_TYPE, OutputStreamEntryMessage],
116122
[GET_STATE_ENTRY_MESSAGE_TYPE, GetStateEntryMessage],
@@ -131,6 +137,7 @@ export type ProtocolMessage =
131137
| CompletionMessage
132138
| SuspensionMessage
133139
| ErrorMessage
140+
| EntryAckMessage
134141
| PollInputStreamEntryMessage
135142
| OutputStreamEntryMessage
136143
| GetStateEntryMessage

src/types/types.ts

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,23 @@
1111

1212
import {
1313
AWAKEABLE_ENTRY_MESSAGE_TYPE,
14+
BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE,
15+
CLEAR_STATE_ENTRY_MESSAGE_TYPE,
16+
COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE,
17+
COMPLETION_MESSAGE_TYPE,
18+
ENTRY_ACK_MESSAGE_TYPE,
19+
ERROR_MESSAGE_TYPE,
1420
GET_STATE_ENTRY_MESSAGE_TYPE,
21+
INVOKE_ENTRY_MESSAGE_TYPE,
1522
KNOWN_MESSAGE_TYPES,
23+
OUTPUT_STREAM_ENTRY_MESSAGE_TYPE,
1624
POLL_INPUT_STREAM_ENTRY_MESSAGE_TYPE,
1725
ProtocolMessage,
26+
SET_STATE_ENTRY_MESSAGE_TYPE,
27+
SIDE_EFFECT_ENTRY_MESSAGE_TYPE,
1828
SLEEP_ENTRY_MESSAGE_TYPE,
1929
START_MESSAGE_TYPE,
30+
SUSPENSION_MESSAGE_TYPE,
2031
} from "./protocol";
2132

2233
export class Message {
@@ -53,19 +64,21 @@ class MessageType {
5364
return messageType == START_MESSAGE_TYPE;
5465
}
5566

56-
static isCustom(messageTypeId: bigint): boolean {
57-
return !KNOWN_MESSAGE_TYPES.has(messageTypeId);
58-
}
59-
60-
static hasRequiresAckFlag(messageTypeId: bigint): boolean {
61-
return this.isCustom(messageTypeId);
67+
static hasRequiresAckFlag(messageType: bigint): boolean {
68+
return (
69+
messageType !== START_MESSAGE_TYPE &&
70+
messageType !== ERROR_MESSAGE_TYPE &&
71+
messageType !== SUSPENSION_MESSAGE_TYPE &&
72+
messageType !== ENTRY_ACK_MESSAGE_TYPE &&
73+
messageType !== COMPLETION_MESSAGE_TYPE
74+
);
6275
}
6376
}
6477

6578
const CUSTOM_MESSAGE_MASK = BigInt(0xfc00);
6679
const COMPLETED_MASK = BigInt(0x0001_0000_0000);
6780
const VERSION_MASK = BigInt(0x03ff_0000_0000);
68-
const REQUIRES_ACK_MASK = BigInt(0x0001_0000_0000);
81+
const REQUIRES_ACK_MASK = BigInt(0x8000_0000_0000);
6982

7083
// The header is exported but only for tests.
7184
export class Header {

test/protoutils.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@ import {
4141
SuspensionMessage,
4242
ERROR_MESSAGE_TYPE,
4343
ErrorMessage,
44+
ENTRY_ACK_MESSAGE_TYPE,
45+
EntryAckMessage,
4446
} from "../src/types/protocol";
4547
import { Message } from "../src/types/types";
4648
import { TestRequest, TestResponse } from "../src/generated/proto/test";
@@ -235,6 +237,15 @@ export function completionMessage(
235237
}
236238
}
237239

240+
export function ackMessage(index: number): Message {
241+
return new Message(
242+
ENTRY_ACK_MESSAGE_TYPE,
243+
EntryAckMessage.create({
244+
entryIndex: index,
245+
})
246+
);
247+
}
248+
238249
export function invokeMessage(
239250
serviceName: string,
240251
methodName: string,

0 commit comments

Comments
 (0)