Skip to content

Commit 4444c17

Browse files
committed
feat: (wip) async adapter
- requester fireAndForget - requester requestResponse - requester requestStream refactor: renamed to SubscribingAsyncIterator + added more tests feat: (wip) add async responders - fireAndForget - requestResponse feat: AsyncIterable requestStream responder refactor: use rxjs observer for async iterable requestStream example feat: add requesChannel responders and requesters refactor: remove unnecessary passing of scheduler test: (wip) requester tests test: async requestResponse requesters tests test: async adapter fireAndForget requester tests refactor: apply linting fix: resolve issues from rebasing test: add tests for requestStream requester refactor: rename async package to adapter-async Signed-off-by: Kevin Viglucci <[email protected]>
1 parent 72c2497 commit 4444c17

29 files changed

+3224
-22024
lines changed

package-lock.json

-20,591
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import type { Config } from "@jest/types";
2+
import { pathsToModuleNameMapper } from "ts-jest/utils";
3+
import { compilerOptions } from "../../tsconfig.json";
4+
5+
const config: Config.InitialOptions = {
6+
preset: "ts-jest",
7+
testRegex: "(\\/__tests__\\/.*|\\.(test|spec))\\.(ts)$",
8+
moduleNameMapper: pathsToModuleNameMapper(compilerOptions.paths, {
9+
// This has to match the baseUrl defined in tsconfig.json.
10+
prefix: "<rootDir>/../../",
11+
}),
12+
modulePathIgnorePatterns: [
13+
"<rootDir>/__tests__/test-utils",
14+
"<rootDir>/__tests__/*.d.ts",
15+
],
16+
collectCoverage: true,
17+
collectCoverageFrom: ["<rootDir>/src/**/*.ts", "!**/node_modules/**"],
18+
};
19+
20+
export default config;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
{
2+
"name": "@rsocket/adapter-async",
3+
"version": "1.0.0",
4+
"main": "dist/index",
5+
"types": "dist/index",
6+
"files": [
7+
"dist"
8+
],
9+
"scripts": {
10+
"build": "yarn run clean && yarn run compile",
11+
"clean": "rimraf -rf ./dist",
12+
"compile": "tsc -p tsconfig.build.json",
13+
"prepublishOnly": "yarn run build",
14+
"test": "jest"
15+
},
16+
"dependencies": {
17+
"@rsocket/composite-metadata": "^1.0.0",
18+
"@rsocket/core": "^1.0.0",
19+
"@rsocket/messaging": "^1.0.0",
20+
"@rsocket/adapter-rxjs": "^1.0.0",
21+
"rxjs": "^7.4.0",
22+
"rxjs-for-await": "^1.0.0"
23+
},
24+
"devDependencies": {
25+
"rimraf": "~3.0.2",
26+
"typescript": "~4.5.2"
27+
}
28+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
import SubscribingAsyncIterator from "../lib/SubscribingAsyncIterator";
2+
import { mock } from "jest-mock-extended";
3+
import {
4+
OnExtensionSubscriber,
5+
OnNextSubscriber,
6+
OnTerminalSubscriber,
7+
Requestable,
8+
} from "@rsocket/core";
9+
import { Codec } from "@rsocket/messaging";
10+
import BufferingForwardingSubscriber from "../lib/BufferingForwardingSubscriber";
11+
import { Buffer } from "buffer";
12+
13+
jest.useFakeTimers();
14+
15+
describe("BufferingForwardingSubscriber", function () {
16+
it("forwards all received onNext calls when received before subscription", async function () {
17+
const mockSubscriber = mock<
18+
OnNextSubscriber & OnTerminalSubscriber & OnExtensionSubscriber
19+
>();
20+
const testObj = new BufferingForwardingSubscriber();
21+
22+
testObj.onNext({ data: Buffer.from("1") }, false);
23+
testObj.onNext({ data: Buffer.from("2") }, false);
24+
testObj.onNext({ data: Buffer.from("3") }, true);
25+
26+
testObj.subscribe(mockSubscriber);
27+
28+
expect(mockSubscriber.onNext).toBeCalledWith(
29+
{ data: Buffer.from("1") },
30+
false
31+
);
32+
expect(mockSubscriber.onNext).toBeCalledWith(
33+
{ data: Buffer.from("2") },
34+
false
35+
);
36+
expect(mockSubscriber.onNext).toBeCalledWith(
37+
{ data: Buffer.from("3") },
38+
true
39+
);
40+
});
41+
42+
it("forwards all received onNext calls when received after subscription", async function () {
43+
const mockSubscriber = mock<
44+
OnNextSubscriber & OnTerminalSubscriber & OnExtensionSubscriber
45+
>();
46+
const testObj = new BufferingForwardingSubscriber();
47+
48+
testObj.subscribe(mockSubscriber);
49+
50+
testObj.onNext({ data: Buffer.from("1") }, false);
51+
testObj.onNext({ data: Buffer.from("2") }, false);
52+
testObj.onNext({ data: Buffer.from("3") }, true);
53+
54+
expect(mockSubscriber.onNext).toBeCalledWith(
55+
{ data: Buffer.from("1") },
56+
false
57+
);
58+
expect(mockSubscriber.onNext).toBeCalledWith(
59+
{ data: Buffer.from("2") },
60+
false
61+
);
62+
expect(mockSubscriber.onNext).toBeCalledWith(
63+
{ data: Buffer.from("3") },
64+
true
65+
);
66+
});
67+
68+
it("forwards all received onNext calls before forwarding subsequent onComplete", async function () {
69+
const mockSubscriber = mock<
70+
OnNextSubscriber & OnTerminalSubscriber & OnExtensionSubscriber
71+
>();
72+
const testObj = new BufferingForwardingSubscriber();
73+
74+
testObj.subscribe(mockSubscriber);
75+
76+
testObj.onNext({ data: Buffer.from("1") }, false);
77+
testObj.onNext({ data: Buffer.from("2") }, false);
78+
testObj.onNext({ data: Buffer.from("3") }, false);
79+
testObj.onComplete();
80+
81+
expect(mockSubscriber.onNext).toBeCalledWith(
82+
{ data: Buffer.from("1") },
83+
false
84+
);
85+
expect(mockSubscriber.onNext).toBeCalledWith(
86+
{ data: Buffer.from("2") },
87+
false
88+
);
89+
expect(mockSubscriber.onNext).toBeCalledWith(
90+
{ data: Buffer.from("3") },
91+
false
92+
);
93+
expect(mockSubscriber.onComplete).toBeCalledWith();
94+
});
95+
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
import SubscribingAsyncIterator from "../lib/SubscribingAsyncIterator";
2+
import { mock } from "jest-mock-extended";
3+
import { Cancellable, Requestable } from "@rsocket/core";
4+
import { Codec } from "@rsocket/messaging";
5+
6+
jest.useFakeTimers();
7+
8+
class StringCodec implements Codec<string> {
9+
readonly mimeType: string = "text/plain";
10+
11+
decode(buffer: Buffer): string {
12+
return buffer.toString();
13+
}
14+
15+
encode(entity: string): Buffer {
16+
return Buffer.from(entity);
17+
}
18+
}
19+
20+
describe("SubscribingAsyncIterator", function () {
21+
it("iterates over emitted values", async function () {
22+
let subscriber;
23+
const subscription = mock<Requestable & Cancellable>({
24+
request(requestN: number) {
25+
for (let i = 0; i < requestN; i++) {
26+
setTimeout(() => {
27+
subscriber.onNext(
28+
{
29+
data: Buffer.from(`${i}`),
30+
metadata: undefined,
31+
},
32+
i === requestN - 1
33+
);
34+
});
35+
}
36+
},
37+
});
38+
const requestSpy = jest.spyOn(subscription, "request");
39+
40+
const initialRequestN = 3;
41+
subscriber = new SubscribingAsyncIterator(
42+
subscription,
43+
initialRequestN * 2,
44+
new StringCodec()
45+
);
46+
subscription.request(initialRequestN);
47+
48+
jest.runAllTimers();
49+
50+
const values = [];
51+
for await (const value of subscriber) {
52+
jest.runAllTimers();
53+
values.push(value);
54+
}
55+
56+
expect(values).toStrictEqual(["0", "1", "2"]);
57+
expect(requestSpy).toBeCalledTimes(1);
58+
});
59+
60+
it("iterates over emitted values until onComplete", async function () {
61+
let subscriber;
62+
const subscription = mock<Requestable & Cancellable>({
63+
request(requestN: number) {
64+
for (let i = 0; i < requestN; i++) {
65+
setTimeout(() => {
66+
if (i === requestN - 1) {
67+
subscriber.onComplete();
68+
} else {
69+
subscriber.onNext(
70+
{
71+
data: Buffer.from(`${i}`),
72+
metadata: undefined,
73+
},
74+
false
75+
);
76+
}
77+
});
78+
}
79+
},
80+
});
81+
const requestSpy = jest.spyOn(subscription, "request");
82+
83+
const initialRequestN = 3;
84+
subscriber = new SubscribingAsyncIterator(
85+
subscription,
86+
initialRequestN * 2,
87+
new StringCodec()
88+
);
89+
subscription.request(initialRequestN);
90+
91+
jest.runAllTimers();
92+
93+
const values = [];
94+
for await (const value of subscriber) {
95+
jest.runAllTimers();
96+
values.push(value);
97+
}
98+
99+
expect(values).toStrictEqual(["0", "1"]);
100+
expect(requestSpy).toBeCalledTimes(1);
101+
});
102+
103+
it("cancels when break statement reached", async function () {
104+
let subscriber;
105+
const subscription = mock<Requestable & Cancellable>({
106+
request(requestN: number) {
107+
for (let i = 0; i < requestN; i++) {
108+
setTimeout(() => {
109+
subscriber.onNext(
110+
{
111+
data: Buffer.from(`${i}`),
112+
metadata: undefined,
113+
},
114+
i === requestN - 1
115+
);
116+
});
117+
}
118+
},
119+
});
120+
const requestSpy = jest.spyOn(subscription, "request");
121+
const cancelSpy = jest.spyOn(subscription, "cancel");
122+
123+
const initialRequestN = 10;
124+
subscriber = new SubscribingAsyncIterator(
125+
subscription,
126+
initialRequestN * 2,
127+
new StringCodec()
128+
);
129+
subscription.request(initialRequestN);
130+
131+
jest.runAllTimers();
132+
133+
const values = [];
134+
for await (const value of subscriber) {
135+
if (values.length == 2) {
136+
break;
137+
}
138+
jest.runAllTimers();
139+
values.push(value);
140+
}
141+
142+
expect(values).toStrictEqual(["0", "1"]);
143+
expect(requestSpy).toBeCalledTimes(1);
144+
expect(requestSpy).toBeCalledWith(10);
145+
expect(cancelSpy).toBeCalledTimes(1);
146+
});
147+
148+
it("ends and throws with emitted exception", async function () {
149+
let subscriber;
150+
const expectedError = new Error("test error");
151+
const subscription = mock<Requestable & Cancellable>({
152+
request(requestN: number) {
153+
setTimeout(() => {
154+
subscriber.onError(expectedError);
155+
});
156+
},
157+
});
158+
const requestSpy = jest.spyOn(subscription, "request");
159+
160+
const initialRequestN = 10;
161+
subscriber = new SubscribingAsyncIterator(
162+
subscription,
163+
initialRequestN * 2,
164+
new StringCodec()
165+
);
166+
subscription.request(initialRequestN);
167+
168+
jest.runAllTimers();
169+
170+
const values = [];
171+
172+
let capturedError;
173+
try {
174+
for await (const value of subscriber) {
175+
jest.runAllTimers();
176+
values.push(value);
177+
}
178+
} catch (error) {
179+
capturedError = error;
180+
}
181+
182+
expect(capturedError).toBe(expectedError);
183+
expect(values).toStrictEqual([]);
184+
expect(requestSpy).toBeCalledWith(10);
185+
});
186+
187+
it("cancels on exception processing emitted value", async function () {
188+
let subscriber;
189+
const subscription = mock<Requestable & Cancellable>({
190+
request(requestN: number) {
191+
for (let i = 0; i < requestN; i++) {
192+
setTimeout(() => {
193+
subscriber.onNext(
194+
{
195+
data: Buffer.from(`${i}`),
196+
metadata: undefined,
197+
},
198+
i === requestN - 1
199+
);
200+
});
201+
}
202+
},
203+
});
204+
const requestSpy = jest.spyOn(subscription, "request");
205+
const cancelSpy = jest.spyOn(subscription, "cancel");
206+
207+
const initialRequestN = 10;
208+
subscriber = new SubscribingAsyncIterator(
209+
subscription,
210+
initialRequestN * 2,
211+
new StringCodec()
212+
);
213+
subscription.request(initialRequestN);
214+
215+
jest.runAllTimers();
216+
217+
const values = [];
218+
try {
219+
for await (const value of subscriber) {
220+
if (values.length == 2) {
221+
throw new Error("test error");
222+
}
223+
values.push(value);
224+
jest.runAllTimers();
225+
}
226+
} catch (e) {}
227+
228+
expect(values).toStrictEqual(["0", "1"]);
229+
expect(requestSpy).toBeCalledTimes(1);
230+
expect(requestSpy).toBeCalledWith(10);
231+
expect(cancelSpy).toBeCalledTimes(1);
232+
});
233+
});

0 commit comments

Comments
 (0)