Skip to content

Commit a023351

Browse files
committed
fix channel's payload.data deserialization
Signed-off-by: Palani C <[email protected]>
1 parent 0f3b333 commit a023351

File tree

2 files changed

+70
-7
lines changed

2 files changed

+70
-7
lines changed

packages/rsocket-core/src/RSocketMachine.js

+7-7
Original file line numberDiff line numberDiff line change
@@ -784,13 +784,14 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
784784
}
785785

786786
_handleRequestChannel(streamId: number, frame: RequestChannelFrame): void {
787+
const payload = this._deserializePayload(frame);
787788
const existingSubscription = this._subscriptions.get(streamId);
788789
if (existingSubscription) {
789790
//Likely a duplicate REQUEST_CHANNEL frame, ignore per spec
790791
return;
791792
}
792793

793-
const payloads = new Flowable(subscriber => {
794+
const payloads = new Flowable<Payload<D, M>>(subscriber => {
794795
let firstRequest = true;
795796

796797
subscriber.onSubscribe({
@@ -823,15 +824,13 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
823824
//critically, if n is 0 now, that's okay because we eagerly decremented it
824825
if (firstRequest && n >= 0) {
825826
firstRequest = false;
826-
//release the initial frame we received in frame form due to map operator
827-
subscriber.onNext(frame);
827+
//release the initial payload we received in frame form due to map operator
828+
subscriber.onNext(payload);
828829
}
829830
},
830831
});
831832
}, MAX_REQUEST_N);
832-
const framesToPayloads = new FlowableProcessor(payloads, frame =>
833-
this._deserializePayload(frame),
834-
);
833+
const framesToPayloads = new FlowableProcessor(payloads);
835834
this._receivers.set(streamId, framesToPayloads);
836835

837836
this._requestHandler.requestChannel(framesToPayloads).subscribe({
@@ -892,13 +891,14 @@ class RSocketMachineImpl<D, M> implements RSocketMachine<D, M> {
892891
flags |= FLAGS.COMPLETE;
893892
this._subscriptions.delete(streamId);
894893
}
894+
let metadata;
895895
if (payload.metadata !== undefined &&
896896
payload.metadata !== null) {
897897
// eslint-disable-next-line no-bitwise
898898
flags |= FLAGS.METADATA;
899+
metadata = this._serializers.metadata.serialize(payload.metadata);
899900
}
900901
const data = this._serializers.data.serialize(payload.data);
901-
const metadata = this._serializers.metadata.serialize(payload.metadata);
902902
this._connection.sendOne({
903903
data,
904904
flags,

packages/rsocket-core/src/__tests__/RSocketServer-test.js

+63
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,69 @@ describe('RSocketServer', () => {
142142
});
143143

144144
describe('RequestHandler', () => {
145+
it('deserializes and serializes the channel\'s payload.data', () => {
146+
console.error = jest.fn();
147+
const transport = genMockTransportServer();
148+
149+
const makePayload = (data) => ({
150+
type: FRAME_TYPES.PAYLOAD,
151+
streamId: 1,
152+
flags: 32, // next bit - invoke onNext
153+
data,
154+
});
155+
156+
const server = new RSocketServer({
157+
getRequestHandler: () => {
158+
return {
159+
requestChannel: (incomingFlowable) => {
160+
// If the payload.data has 'name', reply with custom response
161+
return incomingFlowable.map(payload => {
162+
if (payload && payload.data && payload.data.name) {
163+
return { data: { say: 'hello ' + payload.data.name } };
164+
} else {
165+
return payload;
166+
}
167+
});
168+
},
169+
};
170+
},
171+
serializers:JsonSerializers,
172+
transport,
173+
});
174+
175+
server.start();
176+
transport.mock.connect();
177+
connection.receive.mock.publisher.onNext({
178+
type: FRAME_TYPES.SETUP,
179+
data: undefined,
180+
dataMimeType: '<dataMimeType>',
181+
flags: 0,
182+
keepAlive: 42,
183+
lifetime: 2017,
184+
metadata: undefined,
185+
metadataMimeType: '<metadataMimeType>',
186+
resumeToken: null,
187+
streamId: 0,
188+
majorVersion: 1,
189+
minorVersion: 0,
190+
});
191+
connection.receive.mock.publisher.onNext({
192+
type: FRAME_TYPES.REQUEST_CHANNEL,
193+
flags: 0,
194+
requestN: 100,
195+
streamId: 1,
196+
// data along with first REQUEST_CHANNEL frame
197+
data: JSON.stringify({ name: 'Alex' }),
198+
});
199+
// data as separate PAYLOAD frame
200+
connection.receive.mock.publisher.onNext(makePayload(JSON.stringify({ name: 'Bob' })));
201+
jest.runOnlyPendingTimers();
202+
203+
expect(connection.sendOne.mock.calls.length).toBe(3);
204+
expect(connection.sendOne.mock.calls[1][0]).toEqual(makePayload(JSON.stringify({ say: 'hello Alex' })));
205+
expect(connection.sendOne.mock.calls[2][0]).toEqual(makePayload(JSON.stringify({ say: 'hello Bob' })));
206+
});
207+
145208
it('sends error if getRequestHandler throws', () => {
146209
const transport = genMockTransportServer();
147210
const server = new RSocketServer({

0 commit comments

Comments
 (0)