Skip to content

Commit ab842b9

Browse files
committed
Support HTTP2 client
1 parent d8d6662 commit ab842b9

File tree

10 files changed

+488
-249
lines changed

10 files changed

+488
-249
lines changed

examples/embedded_example.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,14 @@ app.post("/workflow", async (req: Request, res: Response) => {
2626
id,
2727
input: name,
2828
handler: async (ctx, name) => {
29-
const p1 = await ctx.sideEffect(async () => `Hello ${name}!`);
30-
const p2 = await ctx.sideEffect(async () => `Bonjour ${name}`);
31-
const p3 = await ctx.sideEffect(async () => `Hi ${name}`);
32-
// const p4 = await ctx
33-
// .rpc<{ greet: (name: string) => Promise<string> }>({ path: "greeter" })
29+
const p1 = ctx.sideEffect(async () => `Hello ${name}!`);
30+
const p2 = ctx.sideEffect(async () => `Bonjour ${name}`);
31+
const p3 = ctx.sideEffect(async () => `Hi ${name}`);
32+
//const p4 = ctx
33+
// .rpc<{ greet: (name: string) => Promise<string> }>({ path: "greeter" })
3434
// .greet(name);
3535

36-
return p1 + p2 + p3; //+ p4;
36+
return (await p1) + (await p2) + (await p3);
3737
},
3838
});
3939

proto/services.proto

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,182 @@
1+
// Copyright (c) 2023 - Restate Software, Inc., Restate GmbH
2+
//
3+
// This file is part of the Restate service protocol, which is
4+
// released under the MIT license.
5+
//
6+
// You can find a copy of the license in file LICENSE in the root
7+
// directory of this repository or package, or at
8+
// https://github.com/restatedev/proto/blob/main/LICENSE
9+
10+
syntax = "proto3";
11+
12+
/*
13+
This package contains internal service interfaces
14+
*/
15+
package dev.restate.internal;
16+
17+
import "dev/restate/ext.proto";
18+
import "google/protobuf/empty.proto";
19+
import "google/protobuf/struct.proto";
20+
21+
service Proxy {
22+
// Proxy invocation through this service. This service is mostly used for proxying invocations through a specific partition processor, e.g. to reuse the deduplication id map.
23+
rpc ProxyThrough(ProxyThroughRequest) returns (google.protobuf.Empty);
24+
}
25+
26+
message ProxyThroughRequest {
27+
string target_service = 1;
28+
string target_method = 2;
29+
bytes target_key = 3;
30+
bytes target_invocation_uuid = 4;
31+
32+
bytes input = 5;
33+
}
34+
35+
// RemoteContext service to implement the embedded handler API
36+
service RemoteContext {
37+
option (dev.restate.ext.service_type) = KEYED;
38+
39+
// Start a new invocation, or resume a previously existing one.
40+
//
41+
// If another client is already executing this invocation, it will be fenced off and this client will take precedence.
42+
//
43+
// This method is not idempotent.
44+
rpc Start(StartRequest) returns (StartResponse);
45+
46+
// Send new messages to append to the message stream.
47+
//
48+
// This method is not idempotent, and a request can fail for several reasons,
49+
// including errors in sent messages, or some other transient error.
50+
// The client should consider the stream in an unrecoverable error state and it can retry
51+
// by creating a new stream through Start() with a different stream_id.
52+
//
53+
// Once the invocation is completed, subsequent Send fail.
54+
rpc Send(SendRequest) returns (SendResponse);
55+
56+
// Receive new messages from the message stream.
57+
//
58+
// This method is not idempotent, and a request can fail for several reasons,
59+
// including errors in sent messages, or some other transient error.
60+
// The client should consider the stream in an unrecoverable error state and it can retry
61+
// by creating a new stream through Start() with a different stream_id.
62+
//
63+
// If the invocation is completed, Recv returns a response with messages field empty.
64+
rpc Recv(RecvRequest) returns (RecvResponse);
65+
66+
// Get the result of the invocation.
67+
//
68+
// In case another client is executing the invocation (through a sequence of Start/Send/Recv), this method will block
69+
// until a response is computed.
70+
// In case the response is already available, it will return immediately with the response.
71+
// In case no client is executing the invocation, that is no client ever invoked Start() for the given operation_id,
72+
// this method will return response.none.
73+
//
74+
// This method can be safely invoked by multiple clients and it's idempotent.
75+
rpc GetResult(GetResultRequest) returns (GetResultResponse);
76+
77+
// Cleanup all the state of the invocation, excluding the user state.
78+
//
79+
// This is automatically executed when retention_period_sec is past, but it can be manually invoked before the expiry time elapsed.
80+
rpc Cleanup(CleanupRequest) returns (google.protobuf.Empty);
81+
}
82+
83+
message StartRequest {
84+
// User provided operation id, this is used as idempotency key.
85+
string operation_id = 1 [(dev.restate.ext.field) = KEY];
86+
87+
// Stream id to uniquely identify a open stream between client and Restate.
88+
// There can be at most one open stream at the same time.
89+
string stream_id = 2;
90+
91+
// Retention period for the response in seconds.
92+
// After the invocation completes, the response will be persisted for the given duration.
93+
// Afterwards, the system will cleanup the response and treats any subsequent invocation with same operation_id as new.
94+
//
95+
// If not set, 30 minutes will be used as retention period.
96+
uint32 retention_period_sec = 3;
97+
98+
// Argument of the invocation
99+
bytes argument = 4;
100+
}
101+
102+
message StartResponse {
103+
oneof invocation_status {
104+
// Contains the concatenated first messages of the stream, encoded using the same framing used by service-protocol
105+
bytes executing = 1;
106+
107+
// Contains the result of the invocation
108+
GetResultResponse completed = 2;
109+
}
110+
}
111+
112+
message SendRequest {
113+
// User provided operation id, this is used as idempotency key.
114+
string operation_id = 1 [(dev.restate.ext.field) = KEY];
115+
116+
// Stream id to uniquely identify a open stream between client and Restate.
117+
// There can be at most one open stream at the same time.
118+
string stream_id = 2;
119+
120+
// Contains the concatenated messages of the stream, encoded using the same framing used by service-protocol
121+
bytes messages = 3;
122+
}
123+
124+
message SendResponse {
125+
oneof response {
126+
google.protobuf.Empty ok = 1;
127+
128+
// This means the provided stream_id is invalid, and it should not be reused,
129+
// nor the client should create a new stream using Start().
130+
// The client can instead read the invocation result using GetResult().
131+
google.protobuf.Empty invalid_stream = 2;
132+
}
133+
}
134+
135+
message RecvRequest {
136+
// User provided operation id, this is used as idempotency key.
137+
string operation_id = 1 [(dev.restate.ext.field) = KEY];
138+
139+
// Stream id to uniquely identify a open stream between client and Restate.
140+
// There can be at most one open stream at the same time.
141+
string stream_id = 2;
142+
}
143+
144+
message RecvResponse {
145+
oneof response {
146+
// Contains the concatenated messages of the stream, encoded using the same framing used by service-protocol
147+
bytes messages = 1;
148+
149+
// This means the provided stream_id is invalid, and it should not be reused,
150+
// nor the client should create a new stream using Start().
151+
// The client can instead read the invocation result using GetResult().
152+
google.protobuf.Empty invalid_stream = 2;
153+
}
154+
}
155+
156+
message GetResultRequest {
157+
// User provided operation id, this is used as idempotency key.
158+
string operation_id = 1 [(dev.restate.ext.field) = KEY];
159+
}
160+
161+
message GetResultResponse {
162+
message InvocationFailure {
163+
uint32 code = 1;
164+
string message = 2;
165+
}
166+
167+
oneof response {
168+
// See GetResult documentation
169+
google.protobuf.Empty none = 1;
170+
bytes success = 2;
171+
InvocationFailure failure = 3;
172+
}
173+
174+
// Timestamp of the response expiry time in RFC3339.
175+
// Empty if response = none
176+
string expiry_time = 15;
177+
}
178+
179+
message CleanupRequest {
180+
// User provided operation id, this is used as idempotency key.
181+
string operation_id = 1 [(dev.restate.ext.field) = KEY];
182+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import { RemoteContext } from "../generated/proto/services";
2+
import { encodeMessages } from "../io/encoder";
3+
import { Message } from "../types/types";
4+
import { Connection } from "./connection";
5+
6+
export class FencedOffError extends Error {
7+
constructor() {
8+
super("FencedOff");
9+
}
10+
}
11+
12+
export class EmbeddedConnection implements Connection {
13+
private queue: Message[] = [];
14+
private flushing: Promise<void> = Promise.resolve();
15+
16+
constructor(
17+
private readonly operationId: string,
18+
private readonly streamId: string,
19+
private readonly remote: RemoteContext
20+
) {}
21+
22+
send(msg: Message): Promise<void> {
23+
const len = this.queue.push(msg);
24+
if (len === 1) {
25+
// we are the first in line, therefore we schedule a flush,
26+
// BUT we must wait for the previous flush to end.
27+
//
28+
this.flushing = this.flushing.then(() => this.scheduleFlush());
29+
}
30+
// tag along to the previously scheduled flush.
31+
return this.flushing;
32+
}
33+
34+
end(): Promise<void> {
35+
return this.flush();
36+
}
37+
38+
private scheduleFlush(): Promise<void> {
39+
// schedule a flush at the end of the current event loop iteration.
40+
return new Promise((resolve, reject) =>
41+
setImmediate(() => {
42+
this.flush().then(resolve).catch(reject);
43+
})
44+
);
45+
}
46+
47+
private async flush(): Promise<void> {
48+
if (this.queue.length === 0) {
49+
return Promise.resolve();
50+
}
51+
const buffer = encodeMessages(this.queue) as Buffer;
52+
this.queue = [];
53+
54+
const res = await this.remote.send({
55+
operationId: this.operationId,
56+
streamId: this.streamId,
57+
messages: buffer,
58+
});
59+
60+
if (!res.ok) {
61+
throw new Error("Error connecting to restate");
62+
}
63+
if (res.invalidStream !== undefined) {
64+
throw new FencedOffError();
65+
}
66+
}
67+
}

src/embedded/api.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { RpcContext } from "../restate_context";
2+
import { doInvoke } from "./invocation";
3+
import { wrapHandler } from "./handler";
4+
import crypto from "crypto";
5+
import { RemoteContext } from "../generated/proto/services";
6+
import { bufConnectRemoteContext } from "./http2_remote";
7+
8+
export type RestateConnectionOptions = {
9+
ingress: string;
10+
};
11+
12+
export type RestateInvocationOptions<I, O> = {
13+
id: string;
14+
handler: (ctx: RpcContext, input: I) => Promise<O>;
15+
input: I;
16+
retain?: number;
17+
};
18+
19+
export const connection = (opts: RestateConnectionOptions): RestateConnection =>
20+
new RestateConnection(opts);
21+
22+
export class RestateConnection {
23+
private remote: RemoteContext;
24+
25+
constructor(readonly opts: RestateConnectionOptions) {
26+
this.remote = bufConnectRemoteContext(opts.ingress);
27+
}
28+
29+
public invoke<I, O>(opt: RestateInvocationOptions<I, O>): Promise<O> {
30+
const method = wrapHandler(opt.handler);
31+
const streamId = crypto.randomUUID();
32+
return doInvoke<I, O>(this.remote, opt.id, streamId, method, opt.input);
33+
}
34+
}

src/embedded/client.ts

Lines changed: 0 additions & 34 deletions
This file was deleted.

0 commit comments

Comments
 (0)