Skip to content

Commit 489d342

Browse files
committed
Adjust to the latest service definition
1 parent 88006ac commit 489d342

File tree

3 files changed

+25
-23
lines changed

3 files changed

+25
-23
lines changed

proto/services.proto

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,20 +18,6 @@ import "dev/restate/ext.proto";
1818
import "google/protobuf/empty.proto";
1919
import "google/protobuf/struct.proto";
2020

21-
service Proxy {
22-
// Proxy invocation through this service. This service is mostly used for proxying invocations through a specific partition processor, e.g. to reuse the deduplication id map.
23-
rpc ProxyThrough(ProxyThroughRequest) returns (google.protobuf.Empty);
24-
}
25-
26-
message ProxyThroughRequest {
27-
string target_service = 1;
28-
string target_method = 2;
29-
bytes target_key = 3;
30-
bytes target_invocation_uuid = 4;
31-
32-
bytes input = 5;
33-
}
34-
3521
// RemoteContext service to implement the embedded handler API
3622
service RemoteContext {
3723
option (dev.restate.ext.service_type) = KEYED;
@@ -129,6 +115,9 @@ message SendResponse {
129115
// nor the client should create a new stream using Start().
130116
// The client can instead read the invocation result using GetResult().
131117
google.protobuf.Empty invalid_stream = 2;
118+
119+
// This means the invocation is completed, and the result should be collected using GetResult
120+
google.protobuf.Empty invocation_completed = 3;
132121
}
133122
}
134123

@@ -150,6 +139,9 @@ message RecvResponse {
150139
// nor the client should create a new stream using Start().
151140
// The client can instead read the invocation result using GetResult().
152141
google.protobuf.Empty invalid_stream = 2;
142+
143+
// This means the invocation is completed, and the result should be collected using GetResult
144+
google.protobuf.Empty invocation_completed = 3;
153145
}
154146
}
155147

src/connection/embedded_connection.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,12 @@ export class FencedOffError extends Error {
99
}
1010
}
1111

12+
export class InvocationAlreadyCompletedError extends Error {
13+
constructor() {
14+
super("Completed");
15+
}
16+
}
17+
1218
export class EmbeddedConnection implements Connection {
1319
private queue: Message[] = [];
1420
private flushing: Promise<void> = Promise.resolve();
@@ -57,11 +63,11 @@ export class EmbeddedConnection implements Connection {
5763
messages: buffer,
5864
});
5965

60-
if (!res.ok) {
61-
throw new Error("Error connecting to restate");
62-
}
6366
if (res.invalidStream !== undefined) {
6467
throw new FencedOffError();
6568
}
69+
if (res.invocationCompleted !== undefined) {
70+
throw new InvocationAlreadyCompletedError();
71+
}
6672
}
6773
}

src/embedded/invocation.ts

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@ import { InvocationBuilder } from "../invocation";
1616
import { HostedGrpcServiceMethod } from "../types/grpc";
1717
import { StateMachine } from "../state_machine";
1818
import { ProtocolMode } from "../generated/proto/discovery";
19-
import { RequestError } from "./http2_remote";
20-
import { EmbeddedConnection } from "../connection/embedded_connection";
19+
import {
20+
EmbeddedConnection,
21+
FencedOffError,
22+
} from "../connection/embedded_connection";
2123

2224
export const doInvoke = async <I, O>(
2325
remote: RemoteContext,
@@ -78,16 +80,18 @@ export const doInvoke = async <I, O>(
7880
streamId,
7981
});
8082
if (recv.invalidStream !== undefined) {
81-
throw new Error("Operation fenced off");
83+
throw new FencedOffError();
84+
}
85+
if (recv.invocationCompleted !== undefined) {
86+
break;
8287
}
8388
const buffer = recv.messages ?? Buffer.alloc(0);
8489
const messages = decodeMessagesBuffer(buffer);
8590
messages.forEach((m: Message) => stateMachine.handleMessage(m));
8691
}
8792
} catch (e) {
88-
if (!(e instanceof RequestError) || !e.precondtionFailed()) {
89-
stateMachine.handleStreamError(e as Error);
90-
}
93+
stateMachine.handleStreamError(e as Error);
94+
throw e;
9195
}
9296

9397
//

0 commit comments

Comments
 (0)