Skip to content

Commit 013f048

Browse files
committed
adds support for flowable to async iterable conversion
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 8efe639 commit 013f048

File tree

8 files changed

+1018
-434
lines changed

8 files changed

+1018
-434
lines changed

.flowconfig

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,4 @@ suppress_type=$FlowFixMe
2828
suppress_type=$FixMe
2929

3030
[version]
31-
^0.134.0
31+
^0.136.0

package.json

+19-17
Original file line numberDiff line numberDiff line change
@@ -32,41 +32,43 @@
3232
"packages/*"
3333
],
3434
"devDependencies": {
35-
"@babel/cli": "^7.11.6",
36-
"@babel/core": "^7.11.6",
35+
"@babel/cli": "^7.12.1",
36+
"@babel/core": "^7.12.3",
37+
"babel-jest": "^26.6.0",
3738
"babel-eslint": "^10.1.0",
3839
"babel-plugin-minify-replace": "^0.5.0",
39-
"@babel/plugin-transform-async-to-generator": "^7.10.4",
40-
"@babel/plugin-proposal-class-properties": "^7.10.4",
41-
"@babel/plugin-transform-modules-commonjs": "^7.10.4",
42-
"@babel/plugin-transform-flow-strip-types": "^7.10.4",
43-
"@babel/plugin-proposal-object-rest-spread": "^7.11.0",
40+
"@babel/plugin-transform-async-to-generator": "^7.12.1",
41+
"@babel/plugin-proposal-async-generator-functions": "^7.12.1",
42+
"@babel/plugin-proposal-class-properties": "^7.12.1",
43+
"@babel/plugin-transform-modules-commonjs": "^7.12.1",
44+
"@babel/plugin-transform-flow-strip-types": "^7.12.1",
45+
"@babel/plugin-proposal-object-rest-spread": "^7.12.1",
4446
"@babel/plugin-transform-runtime": "^7.11.5",
45-
"@babel/polyfill": "^7.11.5",
47+
"@babel/polyfill": "^7.12.1",
48+
"@babel/runtime": "^7.12.1",
4649
"babel-preset-fbjs": "^3.3.0",
47-
"@babel/runtime": "^7.11.2",
4850
"buffer": "^5.6.0",
4951
"chalk": "^4.1.0",
5052
"cross-env": "^7.0.2",
51-
"eslint": "^7.10.0",
53+
"eslint": "^7.11.0",
5254
"eslint-config-fb-strict": "^26.0.0",
5355
"eslint-plugin-babel": "^5.3.1",
5456
"eslint-plugin-flowtype": "^5.2.0",
5557
"eslint-plugin-jasmine": "^4.1.1",
5658
"eslint-plugin-jsx-a11y": "^6.3.1",
5759
"eslint-plugin-prefer-object-spread": "^1.2.1",
58-
"eslint-plugin-react": "^7.21.2",
60+
"eslint-plugin-react": "^7.21.5",
5961
"eslint-plugin-relay": "^1.8.1",
60-
"eslint-plugin-jest": "^24.0.2",
61-
"fbjs": "^2.0.0",
62-
"fbjs-scripts": "^2.0.0",
63-
"flow-bin": "^0.134.0",
62+
"eslint-plugin-jest": "^24.1.0",
63+
"fbjs": "^3.0.0",
64+
"fbjs-scripts": "^3.0.0",
65+
"flow-bin": "^0.136.0",
6466
"glob": "^7.1.6",
65-
"jest": "^26.4.2",
67+
"jest": "^26.6.0",
6668
"lerna": "^3.22.1",
6769
"object-assign": "^4.1.1",
6870
"prettier": "2.1.2",
69-
"rollup": "^2.28.2",
71+
"rollup": "^2.32.1",
7072
"@rollup/plugin-babel": "^5.2.1",
7173
"@rollup/plugin-commonjs": "^15.1.0",
7274
"@rollup/plugin-node-resolve": "^9.0.0",

packages/rsocket-flowable/src/Flowable.js

+8-6
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import type {
2626

2727
import FlowableMapOperator from './FlowableMapOperator';
2828
import FlowableTakeOperator from './FlowableTakeOperator';
29+
import FlowableAsyncIterable from './FlowableAsyncIterable';
2930

3031
import invariant from 'fbjs/lib/invariant';
3132
import warning from 'fbjs/lib/warning';
@@ -117,6 +118,10 @@ export default class Flowable<T> implements IPublisher<T> {
117118
);
118119
}
119120

121+
toAsyncIterable(prefetch: number = 256): AsyncIterable<T> {
122+
return new FlowableAsyncIterable<T>(this, prefetch);
123+
}
124+
120125
_wrapCallback(callback: (T) => void): IPartialSubscriber<T> {
121126
const max = this._max;
122127
return {
@@ -255,17 +260,14 @@ class FlowableSubscriber<T> implements ISubscriber<T> {
255260

256261
_request = (n: number) => {
257262
invariant(
258-
Number.isInteger(n) && n >= 1 && n <= this._max,
259-
'Flowable: Expected request value to be an integer with a ' +
260-
'value greater than 0 and less than or equal to %s, got ' +
261-
'`%s`.',
262-
this._max,
263+
Number.isInteger(n) && n >= 1,
264+
'Flowable: Expected request value to be an integer with a value greater than 0, got `%s`.',
263265
n,
264266
);
265267
if (!this._active) {
266268
return;
267269
}
268-
if (n === this._max) {
270+
if (n >= this._max) {
269271
this._pending = this._max;
270272
} else {
271273
this._pending += n;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/** Copyright (c) Facebook, Inc. and its affiliates.
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*
15+
* @flow
16+
*/
17+
18+
'use strict';
19+
20+
import type {ISubscriber, ISubscription} from 'rsocket-types';
21+
import Flowable from './Flowable';
22+
23+
// $FlowFixMe
24+
export default class FlowableAsyncIterable<T> implements AsyncIterable<T> {
25+
_source: Flowable<T>;
26+
_prefetch: number;
27+
28+
constructor(source: Flowable<T>, prefetch: number = 256) {
29+
this._source = source;
30+
this._prefetch = prefetch;
31+
}
32+
33+
asyncIterator(): AsyncIterator<T> {
34+
const asyncIteratorSubscriber = new AsyncIteratorSubscriber(this._prefetch);
35+
this._source.subscribe(asyncIteratorSubscriber);
36+
return asyncIteratorSubscriber;
37+
}
38+
39+
// $FlowFixMe
40+
[Symbol.asyncIterator](): AsyncIterator<T> {
41+
return this.asyncIterator();
42+
}
43+
}
44+
45+
// $FlowFixMe
46+
class AsyncIteratorSubscriber<T> implements ISubscriber<T>, AsyncIterator<T> {
47+
_values: T[];
48+
_prefetch: number;
49+
_limit: number;
50+
51+
_subscription: ISubscription;
52+
53+
_produced: number;
54+
55+
_done: boolean;
56+
_error: Error;
57+
58+
_resolve: ?(
59+
result: Promise<IteratorResult<T, void>> | IteratorResult<T, void>,
60+
) => void;
61+
_reject: ?(reason?: any) => void;
62+
63+
constructor(prefetch: number = 256) {
64+
this._prefetch = prefetch;
65+
this._values = [];
66+
this._limit =
67+
prefetch === Number.MAX_SAFE_INTEGER
68+
? Number.MAX_SAFE_INTEGER
69+
: prefetch - (prefetch >> 2);
70+
this._produced = 0;
71+
}
72+
73+
onSubscribe(subscription: ISubscription): void {
74+
this._subscription = subscription;
75+
subscription.request(this._prefetch);
76+
}
77+
78+
onNext(value: T): void {
79+
const resolve = this._resolve;
80+
if (resolve) {
81+
this._resolve = undefined;
82+
this._reject = undefined;
83+
84+
if (++this._produced === this._limit) {
85+
this._produced = 0;
86+
this._subscription.request(this._limit);
87+
}
88+
89+
resolve({done: false, value});
90+
return;
91+
}
92+
93+
this._values.push(value);
94+
}
95+
96+
onComplete(): void {
97+
this._done = true;
98+
99+
const resolve = this._resolve;
100+
if (resolve) {
101+
this._resolve = undefined;
102+
this._reject = undefined;
103+
104+
resolve({done: true});
105+
}
106+
}
107+
108+
onError(error: Error): void {
109+
this._done = true;
110+
this._error = error;
111+
112+
const reject = this._reject;
113+
if (reject) {
114+
this._resolve = undefined;
115+
this._reject = undefined;
116+
117+
reject(error);
118+
}
119+
}
120+
121+
next(): Promise<IteratorResult<T, void>> {
122+
const value = this._values.shift();
123+
if (value) {
124+
if (++this._produced === this._limit) {
125+
this._produced = 0;
126+
this._subscription.request(this._limit);
127+
}
128+
129+
return Promise.resolve({done: false, value});
130+
} else if (this._done) {
131+
if (this._error) {
132+
return Promise.reject(this._error);
133+
} else {
134+
return Promise.resolve({done: true});
135+
}
136+
} else {
137+
return new Promise((resolve, reject) => {
138+
this._resolve = resolve;
139+
this._reject = reject;
140+
});
141+
}
142+
}
143+
144+
return(): Promise<IteratorResult<T, void>> {
145+
this._subscription.cancel();
146+
return Promise.resolve({done: true});
147+
}
148+
149+
// $FlowFixMe
150+
[Symbol.asyncIterator](): AsyncIterator<T> {
151+
return this;
152+
}
153+
}

0 commit comments

Comments
 (0)