Skip to content

Commit 54b50ab

Browse files
authored
fix: handle server-sent termination in RPC (#432)
1 parent 86f1134 commit 54b50ab

File tree

1 file changed

+21
-0
lines changed

1 file changed

+21
-0
lines changed

lib/eventstream_rpc.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -785,6 +785,13 @@ class OperationBase extends EventEmitter {
785785
* @return the operation's underlying event stream binding object
786786
*/
787787
getStream() : eventstream.ClientStream { return this.stream; }
788+
789+
/**
790+
* Set this operation state to be "Ended" so that closing the operation will not send a terminate message.
791+
*/
792+
setStateEnded() {
793+
this.state = OperationState.Ended;
794+
}
788795
}
789796

790797
/**
@@ -837,6 +844,11 @@ export class RequestResponseOperation<RequestType, ResponseType> extends EventEm
837844
await this.operation.activate(requestMessage);
838845

839846
let message : eventstream.Message = await responsePromise;
847+
848+
// If the server terminated the stream, then set the operation to be ended immediately
849+
if ((message.flags ?? 0) & eventstream.MessageFlags.TerminateStream) {
850+
this.operation.setStateEnded();
851+
}
840852
let response : ResponseType = deserializeResponse(this.serviceModel, this.operationConfig.name, message);
841853

842854
resolve(response);
@@ -928,6 +940,15 @@ export class StreamingOperation<RequestType, ResponseType, OutboundMessageType,
928940
let message : eventstream.Message = await responsePromise;
929941
let response : ResponseType = deserializeResponse(this.serviceModel, this.operationConfig.name, message);
930942

943+
// If the server terminated the stream, then set the operation to be ended immediately
944+
if ((message.flags ?? 0) & eventstream.MessageFlags.TerminateStream) {
945+
this.operation.setStateEnded();
946+
// Server hung up on us. Immediately cleanup the operation state.
947+
// Do this before resolving the promise so that any user-initiated
948+
// requests will see the correct state, which is that the operation is closed.
949+
await this.close();
950+
}
951+
931952
resolve(response);
932953
} catch (e) {
933954
await this.close();

0 commit comments

Comments
 (0)