Skip to content

Commit dc5d77a

Browse files
ctx.invoke/invokeOneWay now accept a serialization and deserialization function. (#402)
This simplifies a bit the problem with CombineablePromise, where now we first transform the result deserializing it, and then we wrap it in a combineable promise. It also makes consistent that the ser/de concern is within the invoke/invokeOneWay, and not on the caller.
1 parent 2e9baa7 commit dc5d77a

File tree

1 file changed

+46
-24
lines changed

1 file changed

+46
-24
lines changed

packages/restate-sdk/src/context_impl.ts

Lines changed: 46 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -259,42 +259,53 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
259259
// --- Calls, background calls, etc
260260

261261
// DON'T make this function async!!! see sideEffect comment for details.
262-
private invoke(
262+
private invoke<REQ = Uint8Array, RES = Uint8Array>(
263263
service: string,
264264
method: string,
265-
data: Uint8Array,
266-
key?: string
267-
/* eslint-disable-next-line @typescript-eslint/no-explicit-any */
268-
): InternalCombineablePromise<any> {
265+
data: REQ,
266+
key?: string,
267+
serializer?: (req: REQ) => Uint8Array,
268+
deserializer?: (res: Uint8Array) => RES
269+
): InternalCombineablePromise<RES> {
269270
this.checkState("invoke");
270271

271272
const msg = new CallEntryMessage({
272273
serviceName: service,
273274
handlerName: method,
274-
parameter: data,
275+
parameter: serializer ? serializer(data) : (data as Uint8Array),
275276
key,
276277
});
278+
277279
return this.markCombineablePromise(
278-
this.stateMachine
279-
.handleUserCodeMessage(INVOKE_ENTRY_MESSAGE_TYPE, msg)
280-
.transform((v): any => deserializeJson(v as Uint8Array))
280+
(
281+
this.stateMachine.handleUserCodeMessage(
282+
INVOKE_ENTRY_MESSAGE_TYPE,
283+
msg
284+
) as WrappedPromise<Uint8Array>
285+
).transform((res) => {
286+
if (deserializer) {
287+
return deserializer(res);
288+
}
289+
return res;
290+
}) as WrappedPromise<RES>
281291
);
282292
}
283293

284-
private async invokeOneWay(
294+
private async invokeOneWay<REQ = Uint8Array>(
285295
service: string,
286296
method: string,
287-
data: Uint8Array,
297+
data: REQ,
298+
serializer?: (req: REQ) => Uint8Array,
288299
delay?: number,
289300
key?: string
290-
): Promise<Uint8Array> {
301+
): Promise<void> {
291302
const actualDelay = delay || 0;
292303
const invokeTime =
293304
actualDelay > 0 ? Date.now() + actualDelay : protoInt64.zero;
294305
const msg = new OneWayCallEntryMessage({
295306
serviceName: service,
296307
handlerName: method,
297-
parameter: data,
308+
parameter: serializer ? serializer(data) : (data as Uint8Array),
298309
invokeTime: protoInt64.parse(invokeTime),
299310
key,
300311
});
@@ -303,7 +314,6 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
303314
BACKGROUND_INVOKE_ENTRY_MESSAGE_TYPE,
304315
msg
305316
);
306-
return new Uint8Array();
307317
}
308318

309319
serviceClient<D>({ name }: ServiceDefinitionFrom<D>): Client<Service<D>> {
@@ -313,8 +323,14 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
313323
get: (_target, prop) => {
314324
const route = prop as string;
315325
return (...args: unknown[]) => {
316-
const requestBytes = serializeJson(args.shift());
317-
return this.invoke(name, route, requestBytes);
326+
return this.invoke(
327+
name,
328+
route,
329+
args.shift(),
330+
undefined,
331+
serializeJson,
332+
deserializeJson
333+
);
318334
};
319335
},
320336
}
@@ -333,8 +349,14 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
333349
get: (_target, prop) => {
334350
const route = prop as string;
335351
return (...args: unknown[]) => {
336-
const requestBytes = serializeJson(args.shift());
337-
return this.invoke(name, route, requestBytes, key);
352+
return this.invoke(
353+
name,
354+
route,
355+
args.shift(),
356+
key,
357+
serializeJson,
358+
deserializeJson
359+
);
338360
};
339361
},
340362
}
@@ -353,11 +375,11 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
353375
get: (_target, prop) => {
354376
const route = prop as string;
355377
return (...args: unknown[]) => {
356-
const requestBytes = serializeJson(args.shift());
357378
this.invokeOneWay(
358379
service.name,
359380
route,
360-
requestBytes,
381+
args.shift(),
382+
serializeJson,
361383
opts?.delay
362384
).catch((e) => {
363385
this.stateMachine.handleDanglingPromiseError(e as Error);
@@ -381,11 +403,11 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
381403
get: (_target, prop) => {
382404
const route = prop as string;
383405
return (...args: unknown[]) => {
384-
const requestBytes = serializeJson(args.shift());
385406
this.invokeOneWay(
386407
obj.name,
387408
route,
388-
requestBytes,
409+
args.shift(),
410+
serializeJson,
389411
opts?.delay,
390412
key
391413
).catch((e) => {
@@ -410,11 +432,11 @@ export class ContextImpl implements ObjectContext, WorkflowContext {
410432
get: (_target, prop) => {
411433
const route = prop as string;
412434
return (...args: unknown[]) => {
413-
const requestBytes = serializeJson(args.shift());
414435
this.invokeOneWay(
415436
def.name,
416437
route,
417-
requestBytes,
438+
args.shift(),
439+
serializeJson,
418440
opts?.delay,
419441
key
420442
).catch((e) => {

0 commit comments

Comments
 (0)