Skip to content

Commit a18b44c

Browse files
committed
Use BufferedConnection in the http_connection
This commit overall improves the performances of sending journal entries to the runtime by 4x, by reducing the number of temporary buffer allocations, promises, and intermediate serialization.
1 parent c3d84ac commit a18b44c

File tree

8 files changed

+104
-60
lines changed

8 files changed

+104
-60
lines changed
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
import { encodeMessages } from "../io/encoder";
2+
import { Message } from "../types/types";
3+
import { Connection } from "./connection";
4+
5+
export class BufferedConnection implements Connection {
6+
private queue: Message[] = [];
7+
private flushing: Promise<void> = Promise.resolve();
8+
9+
constructor(private readonly flushFn: (buffer: Buffer) => Promise<void>) {}
10+
11+
send(msg: Message): Promise<void> {
12+
const len = this.queue.push(msg);
13+
if (len === 1) {
14+
// we are the first in line, therefore we schedule a flush,
15+
// BUT we must wait for the previous flush to end.
16+
this.flushing = this.flushing.then(() => this.scheduleFlush());
17+
}
18+
// we don't need to reschedule the `flush` here,
19+
// because the flush happens anyway at the end of the current event loop iteration.
20+
// tag along to the previously scheduled flush.
21+
return this.flushing;
22+
}
23+
24+
end(): Promise<void> {
25+
this.flushing = this.flushing.then(() => this.flush());
26+
return this.flushing;
27+
}
28+
29+
private scheduleFlush(): Promise<void> {
30+
// schedule a flush at the end of the current event loop iteration.
31+
return new Promise((resolve, reject) =>
32+
setImmediate(() => {
33+
this.flush().then(resolve).catch(reject);
34+
})
35+
);
36+
}
37+
38+
private async flush(): Promise<void> {
39+
if (this.queue.length === 0) {
40+
return Promise.resolve();
41+
}
42+
const buffer = encodeMessages(this.queue) as Buffer;
43+
this.queue = [];
44+
45+
return this.flushFn(buffer);
46+
}
47+
}

src/connection/embedded_connection.ts

Lines changed: 8 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010
*/
1111

1212
import { RemoteContext } from "../generated/proto/services";
13-
import { encodeMessages } from "../io/encoder";
1413
import { Message } from "../types/types";
14+
import { BufferedConnection } from "./buffered_connection";
1515
import { Connection } from "./connection";
1616

1717
export class FencedOffError extends Error {
@@ -27,47 +27,25 @@ export class InvocationAlreadyCompletedError extends Error {
2727
}
2828

2929
export class EmbeddedConnection implements Connection {
30-
private queue: Message[] = [];
31-
private flushing: Promise<void> = Promise.resolve();
30+
private buffered: BufferedConnection;
3231

3332
constructor(
3433
private readonly operationId: string,
3534
private readonly streamId: string,
3635
private readonly remote: RemoteContext
37-
) {}
36+
) {
37+
this.buffered = new BufferedConnection((buffer) => this.sendBuffer(buffer));
38+
}
3839

3940
send(msg: Message): Promise<void> {
40-
const len = this.queue.push(msg);
41-
if (len === 1) {
42-
// we are the first in line, therefore we schedule a flush,
43-
// BUT we must wait for the previous flush to end.
44-
this.flushing = this.flushing.then(() => this.scheduleFlush());
45-
}
46-
// tag along to the previously scheduled flush.
47-
return this.flushing;
41+
return this.buffered.send(msg);
4842
}
4943

5044
end(): Promise<void> {
51-
this.flushing = this.flushing.then(() => this.flush());
52-
return this.flushing;
53-
}
54-
55-
private scheduleFlush(): Promise<void> {
56-
// schedule a flush at the end of the current event loop iteration.
57-
return new Promise((resolve, reject) =>
58-
setImmediate(() => {
59-
this.flush().then(resolve).catch(reject);
60-
})
61-
);
45+
return this.buffered.end();
6246
}
6347

64-
private async flush(): Promise<void> {
65-
if (this.queue.length === 0) {
66-
return Promise.resolve();
67-
}
68-
const buffer = encodeMessages(this.queue) as Buffer;
69-
this.queue = [];
70-
48+
private async sendBuffer(buffer: Buffer): Promise<void> {
7149
const res = await this.remote.send({
7250
operationId: this.operationId,
7351
streamId: this.streamId,

src/connection/http_connection.ts

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,12 @@
1010
*/
1111

1212
import stream from "stream";
13-
import { encodeMessage } from "../io/encoder";
1413
import { streamDecoder } from "../io/decoder";
1514
import { Connection, RestateStreamConsumer } from "./connection";
1615
import { Message } from "../types/types";
1716
import { rlog } from "../utils/logger";
1817
import { finished } from "stream/promises";
18+
import { BufferedConnection } from "./buffered_connection";
1919

2020
// utility promise, for cases where we want to save allocation of an extra promise
2121
const RESOLVED: Promise<void> = Promise.resolve();
@@ -55,19 +55,25 @@ export class RestateHttp2Connection implements Connection {
5555
// input as decoded messages
5656
private readonly sdkInput: stream.Readable;
5757

58-
// output as encoded bytes. we convert manually, not as transforms,
59-
// to skip a layer of stream indirection
60-
private readonly sdkOutput: stream.Writable;
61-
6258
// consumer handling
6359
private currentConsumer: RestateStreamConsumer | null = null;
6460
private inputBuffer: Message[] = [];
6561
private consumerError?: Error;
6662
private consumerInputClosed = false;
6763

64+
private outputBuffer: BufferedConnection;
65+
6866
constructor(private readonly rawStream: stream.Duplex) {
6967
this.sdkInput = rawStream.pipe(streamDecoder());
70-
this.sdkOutput = rawStream;
68+
69+
this.outputBuffer = new BufferedConnection((buffer) => {
70+
const hasMoreCapacity = rawStream.write(buffer);
71+
if (hasMoreCapacity) {
72+
return RESOLVED;
73+
} else {
74+
return new Promise((resolve) => rawStream.once("drain", resolve));
75+
}
76+
});
7177

7278
// remember and forward messages
7379
this.sdkInput.on("data", (m: Message) => {
@@ -201,23 +207,16 @@ export class RestateHttp2Connection implements Connection {
201207
* capacity again, so that at least the operations that await results will respect backpressure.
202208
*/
203209
public send(msg: Message): Promise<void> {
204-
const encodedMessage: Uint8Array = encodeMessage(msg);
205-
206-
const hasMoreCapacity = this.sdkOutput.write(encodedMessage);
207-
if (hasMoreCapacity) {
208-
return RESOLVED;
209-
}
210-
211-
return new Promise((resolve) => {
212-
this.sdkOutput.once("drain", resolve);
213-
});
210+
return this.outputBuffer.send(msg);
214211
}
215212

216213
/**
217214
* Ends the stream, awaiting pending writes.
218215
*/
219216
public async end(): Promise<void> {
220-
this.sdkOutput.end();
217+
await this.outputBuffer.end();
218+
219+
this.rawStream.end();
221220

222221
const options = {
223222
error: true,

src/io/encoder.ts

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import stream from "stream";
1313
import { PROTOBUF_MESSAGE_BY_TYPE } from "../types/protocol";
1414
import { Header, Message } from "../types/types";
15+
import _m0 from "protobufjs/minimal";
1516

1617
export function streamEncoder(): stream.Transform {
1718
return new stream.Transform({
@@ -32,7 +33,11 @@ export function encodeMessage(msg: Message): Uint8Array {
3233
}
3334

3435
export function encodeMessages(messages: Message[]): Uint8Array {
35-
const chunks: Buffer[] = [];
36+
const offsets = [];
37+
const headers = [];
38+
let off = 0;
39+
40+
const writer = _m0.Writer.create();
3641
for (const message of messages) {
3742
const pbType = PROTOBUF_MESSAGE_BY_TYPE.get(BigInt(message.messageType));
3843
if (pbType === undefined) {
@@ -41,19 +46,27 @@ export function encodeMessages(messages: Message[]): Uint8Array {
4146
message.messageType
4247
);
4348
}
44-
const bodyBuf = pbType.encode(message.message).finish();
49+
offsets.push(off);
50+
writer.fixed64(0);
51+
pbType.encode(message.message, writer);
52+
const messageLen = writer.len - 8 - off;
53+
off = writer.len;
54+
4555
const header = new Header(
4656
BigInt(message.messageType),
47-
bodyBuf.length,
57+
messageLen,
4858
message.completed,
4959
message.protocolVersion, // only set for incoming start message
5060
message.requiresAck
5161
);
52-
const encoded = header.toU64be();
53-
const headerBuf = Buffer.alloc(8);
54-
headerBuf.writeBigUInt64BE(encoded);
55-
chunks.push(headerBuf);
56-
chunks.push(bodyBuf);
62+
const header64 = header.toU64be();
63+
headers.push(header64);
64+
}
65+
const buffer = writer.finish() as Buffer;
66+
for (let i = 0; i < offsets.length; i++) {
67+
const offset = offsets[i];
68+
const header = headers[i];
69+
buffer.writeBigUInt64BE(header, offset);
5770
}
58-
return Buffer.concat(chunks);
71+
return buffer;
5972
}

src/journal.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ import { SideEffectEntryMessage } from "./generated/proto/javascript";
3939
import { Invocation } from "./invocation";
4040
import { failureToError, RetryableError } from "./types/errors";
4141

42+
const RESOLVED = Promise.resolve(undefined);
43+
4244
export class Journal<I, O> {
4345
private state = NewExecutionState.REPLAYING;
4446

@@ -113,7 +115,7 @@ export class Journal<I, O> {
113115
case p.COMPLETE_AWAKEABLE_ENTRY_MESSAGE_TYPE:
114116
case p.BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE: {
115117
// Do not need completion
116-
return Promise.resolve(undefined);
118+
return RESOLVED;
117119
}
118120
case p.GET_STATE_ENTRY_MESSAGE_TYPE: {
119121
const getStateMsg = message as GetStateEntryMessage;

src/restate_context_impl.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,10 @@ async function executeWithRetries<T>(
444444
// - they are not retried within the service, because they will never succeed within this service,
445445
// but can only succeed within a new invocation going to service with fixed code
446446
// we hence break the retries here similar to terminal errors
447-
if (e instanceof RestateError && e.code == RestateErrorCodes.JOURNAL_MISMATCH) {
447+
if (
448+
e instanceof RestateError &&
449+
e.code == RestateErrorCodes.JOURNAL_MISMATCH
450+
) {
448451
throw e;
449452
}
450453

src/types/errors.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,9 @@ export class RetryableError extends RestateError {
225225
- In the replayed messages: type: ${
226226
replayMessage.messageType
227227
}, message: ${printMessageAsJson(replayMessage.message)}`;
228-
return new RetryableError(msg, { errorCode: RestateErrorCodes.JOURNAL_MISMATCH });
228+
return new RetryableError(msg, {
229+
errorCode: RestateErrorCodes.JOURNAL_MISMATCH,
230+
});
229231
}
230232

231233
public static protocolViolation(message: string) {

test/protoutils.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ import {
5555
import { expect } from "@jest/globals";
5656
import { jsonSerialize, printMessageAsJson } from "../src/utils/utils";
5757
import { rlog } from "../src/utils/logger";
58-
import {ErrorCodes, RestateErrorCodes} from "../src/types/errors";
58+
import { ErrorCodes, RestateErrorCodes } from "../src/types/errors";
5959

6060
export function startMessage(
6161
knownEntries?: number,

0 commit comments

Comments
 (0)