@@ -60,25 +60,25 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck {
60
60
@Test
61
61
fun fireAndForget10 () = test {
62
62
(1 .. 10 ).map { async { client.fireAndForget(payload(it)) } }.awaitAll()
63
- delay(1000 ) // TODO: leak check
63
+ // delay(1000) //TODO: leak check
64
64
}
65
65
66
66
@Test
67
67
open fun largePayloadFireAndForget10 () = test {
68
68
(1 .. 10 ).map { async { client.fireAndForget(requesterLargePayload) } }.awaitAll()
69
- delay(1000 ) // TODO: leak check
69
+ // delay(1000) //TODO: leak check
70
70
}
71
71
72
72
@Test
73
73
fun metadataPush10 () = test {
74
74
(1 .. 10 ).map { async { client.metadataPush(packet(requesterData)) } }.awaitAll()
75
- delay(1000 ) // TODO: leak check
75
+ // delay(1000) //TODO: leak check
76
76
}
77
77
78
78
@Test
79
79
open fun largePayloadMetadataPush10 () = test {
80
80
(1 .. 10 ).map { async { client.metadataPush(packet(requesterLargeData)) } }.awaitAll()
81
- delay(1000 ) // TODO: leak check
81
+ // delay(1000) //TODO: leak check
82
82
}
83
83
84
84
@Test
@@ -89,106 +89,116 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck {
89
89
90
90
@Test
91
91
fun requestChannel1 () = test(10 .seconds) {
92
- val list = client.requestChannel(payload(0 ), flowOf(payload(0 ))).onEach { it.close() }.toList()
93
- assertEquals(1 , list.size)
92
+ val count =
93
+ client.requestChannel(payload(0 ), flowOf(payload(0 )))
94
+ .onEach { it.close() }
95
+ .count()
96
+ assertEquals(1 , count)
94
97
}
95
98
96
99
@Test
97
100
fun requestChannel3 () = test {
98
101
val request = flow {
99
102
repeat(3 ) { emit(payload(it)) }
100
103
}
101
- val list =
102
- client.requestChannel(payload(0 ), request).flowOn(PrefetchStrategy (3 , 0 )).onEach { it.close() }.toList()
103
- assertEquals(3 , list.size)
104
+ val count =
105
+ client.requestChannel(payload(0 ), request)
106
+ .flowOn(PrefetchStrategy (3 , 0 ))
107
+ .onEach { it.close() }
108
+ .count()
109
+ assertEquals(3 , count)
104
110
}
105
111
106
112
@Test
107
113
open fun largePayloadRequestChannel200 () = test {
108
114
val request = flow {
109
115
repeat(200 ) { emit(requesterLargePayload) }
110
116
}
111
- val list =
117
+ val count =
112
118
client.requestChannel(requesterLargePayload, request)
113
119
.flowOn(PrefetchStrategy (Int .MAX_VALUE , 0 ))
114
120
.onEach { it.close() }
115
- .toList ()
116
- assertEquals(200 , list.size )
121
+ .count ()
122
+ assertEquals(200 , count )
117
123
}
118
124
119
125
@Test
120
- @Ignore // flaky, ignore for now
126
+ // @Ignore //flaky, ignore for now
121
127
fun requestChannel20000 () = test {
122
128
val request = flow {
123
129
repeat(20_000 ) { emit(payload(7 )) }
124
130
}
125
- val list = client.requestChannel(payload(7 ), request).flowOn(PrefetchStrategy (Int .MAX_VALUE , 0 )).onEach {
131
+ val count = client.requestChannel(payload(7 ), request).flowOn(PrefetchStrategy (Int .MAX_VALUE , 0 )).onEach {
126
132
assertEquals(requesterData, it.data.readText())
127
133
assertEquals(requesterMetadata, it.metadata?.readText())
128
- }.toList ()
129
- assertEquals(20_000 , list.size )
134
+ }.count ()
135
+ assertEquals(20_000 , count )
130
136
}
131
137
132
138
@Test
133
- @Ignore // flaky, ignore for now
139
+ // @Ignore //flaky, ignore for now
134
140
fun requestChannel200000 () = test {
135
141
val request = flow {
136
142
repeat(200_000 ) { emit(payload(it)) }
137
143
}
138
- val list =
139
- client.requestChannel(payload(0 ), request).flowOn(PrefetchStrategy (10000 , 0 )).onEach { it.close() }.toList()
140
- assertEquals(200_000 , list.size)
144
+ val count =
145
+ client.requestChannel(payload(0 ), request)
146
+ .flowOn(PrefetchStrategy (10000 , 0 ))
147
+ .onEach { it.close() }
148
+ .count()
149
+ assertEquals(200_000 , count)
141
150
}
142
151
143
152
@Test
144
- @Ignore // flaky, ignore for now
153
+ // @Ignore //flaky, ignore for now
145
154
fun requestChannel16x256 () = test {
146
155
val request = flow {
147
156
repeat(256 ) {
148
157
emit(payload(it))
149
158
}
150
159
}
151
- (0 .. 16 ).map {
152
- async( Dispatchers . Default ) {
153
- val list = client.requestChannel(payload(0 ), request).onEach { it.close() }.toList ()
154
- assertEquals(256 , list.size )
160
+ (0 .. 16 ).map { r ->
161
+ async {
162
+ val count = client.requestChannel(payload(0 ), request).onEach { it.close() }.count ()
163
+ assertEquals(256 , count )
155
164
}
156
165
}.awaitAll()
157
166
}
158
167
159
168
@Test
160
- @Ignore // flaky, ignore for now
169
+ // @Ignore //flaky, ignore for now
161
170
fun requestChannel256x512 () = test {
162
171
val request = flow {
163
172
repeat(512 ) {
164
173
emit(payload(it))
165
174
}
166
175
}
167
176
(0 .. 256 ).map {
168
- async( Dispatchers . Default ) {
169
- val list = client.requestChannel(payload(0 ), request).onEach { it.close() }.toList ()
170
- assertEquals(512 , list.size )
177
+ async {
178
+ val count = client.requestChannel(payload(0 ), request).onEach { it.close() }.count ()
179
+ assertEquals(512 , count )
171
180
}
172
181
}.awaitAll()
173
182
}
174
183
175
184
@Test
176
- @Ignore // flaky, ignore for now
185
+ // @Ignore //flaky, ignore for now
177
186
fun requestChannel500NoLeak () = test {
178
187
val request = flow {
179
188
repeat(10_000 ) { emitOrClose(payload(3 )) }
180
189
}
181
- val list =
190
+ val count =
182
191
client
183
192
.requestChannel(payload(3 ), request)
184
193
.flowOn(PrefetchStrategy (Int .MAX_VALUE , 0 ))
185
194
.take(500 )
186
195
.onEach {
187
196
assertEquals(requesterData, it.data.readText())
188
197
assertEquals(requesterMetadata, it.metadata?.readText())
189
- }.toList()
190
- assertEquals(500 , list.size)
191
- delay(1000 ) // TODO: leak check
198
+ }
199
+ .count()
200
+ assertEquals(500 , count)
201
+ // (1000) //TODO: leak check
192
202
}
193
203
194
204
@Test
@@ -212,42 +222,42 @@ abstract class TransportTest : SuspendTest, TestWithLeakCheck {
212
222
}
213
223
214
224
@Test
215
- @Ignore // flaky, ignore for now
225
+ // @Ignore //flaky, ignore for now
216
226
fun requestResponse10000 () = test {
217
227
(1 .. 10000 ).map { async { client.requestResponse(payload(3 )).let (Companion ::checkPayload) } }.awaitAll()
218
228
}
219
229
220
230
@Test
221
- @Ignore // flaky, ignore for now
231
+ // @Ignore //flaky, ignore for now
222
232
fun requestResponse100000 () = test {
223
233
repeat(100000 ) { client.requestResponse(payload(3 )).let (Companion ::checkPayload) }
224
234
}
225
235
226
236
@Test
227
237
fun requestStream5 () = test {
228
- val list =
229
- client.requestStream(payload(3 )).flowOn(PrefetchStrategy (5 , 0 )).take(5 ).onEach { checkPayload(it) }.toList ()
230
- assertEquals(5 , list.size )
238
+ val count =
239
+ client.requestStream(payload(3 )).flowOn(PrefetchStrategy (5 , 0 )).take(5 ).onEach { checkPayload(it) }.count ()
240
+ assertEquals(5 , count )
231
241
}
232
242
233
243
@Test
234
244
fun requestStream10000 () = test {
235
- val list = client.requestStream(payload(3 )).onEach { checkPayload(it) }.toList ()
236
- assertEquals(10000 , list.size )
245
+ val count = client.requestStream(payload(3 )).onEach { checkPayload(it) }.count ()
246
+ assertEquals(10000 , count )
237
247
}
238
248
239
249
@Test
240
- @Ignore // flaky, ignore for now
250
+ // @Ignore //flaky, ignore for now
241
251
fun requestStream500NoLeak () = test {
242
- val list =
252
+ val count =
243
253
client
244
254
.requestStream(payload(3 ))
245
255
.flowOn(PrefetchStrategy (Int .MAX_VALUE , 0 ))
246
256
.take(500 )
247
257
.onEach { checkPayload(it) }
248
- .toList ()
249
- assertEquals(500 , list.size )
250
- delay(1000 ) // TODO: leak check
258
+ .count ()
259
+ assertEquals(500 , count )
260
+ // delay(1000) //TODO: leak check
251
261
}
252
262
253
263
companion object {
0 commit comments