diff --git a/lib/src/client/transport/fetch_transport.dart b/lib/src/client/transport/fetch_transport.dart new file mode 100644 index 00000000..3a2bcf26 --- /dev/null +++ b/lib/src/client/transport/fetch_transport.dart @@ -0,0 +1,453 @@ +// Copyright (c) 2022, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +import 'dart:async'; +import 'dart:convert'; +import 'dart:html'; +import 'dart:js_util' as js_util; +import 'dart:typed_data'; + +import 'package:async/async.dart'; +import 'package:js/js.dart'; +import 'package:meta/meta.dart'; + +import '../../client/call.dart'; +import '../../shared/message.dart'; +import '../../shared/status.dart'; +import '../connection.dart'; +import 'cors.dart' as cors; +import 'transport.dart'; +import 'web_streams.dart'; + +const _contentTypeKey = 'Content-Type'; + +@JS() +class AbortSignal { + external factory AbortSignal(); + external bool get aborted; +} + +@JS() +class AbortController { + external factory AbortController(); + external void abort([dynamic reason]); + external AbortSignal get signal; +} + +@anonymous +// ignore: missing_js_lib_annotation +@JS() +class RequestInit { + external factory RequestInit( + {required String method, + Object? headers, + List? body, + AbortSignal? signal, + required String referrerPolicy, + required String mode, + required String credentials, + required String cache, + required String redirect, + required String integrity, + required bool keepalive}); + + external String get method; + external set method(String newValue); + + external Object? get headers; + external set headers(Object? newValue); + + external Uint8List? get body; + external set body(Uint8List? newValue); + + external AbortSignal? get signal; + external set signal(AbortSignal? newValue); + + external String get referrerPolicy; + external set referrerPolicy(String newValue); + + external String get mode; + external set mode(String newValue); + + external String get credentials; + external set credentials(String newValue); + + external String get cache; + external set cache(String newValue); + + external String get redirect; + external set redirect(String newValue); + + external String get integrity; + external set integrity(String newValue); + + external bool get keepalive; + external set keepalive(bool newValue); +} + +/// Implementation of Fetch API simulating @HttpRequest for minimal changes +class FetchHttpRequest { + // Request parameters + var method = 'GET'; + var uri = ''; + var referrerPolicy = 'origin'; + var mode = 'cors'; + var credentials = 'omit'; + var cache = 'default'; + var redirect = 'follow'; + var integrity = ''; + var keepAlive = true; + var headers = {}; + var readyState = HttpRequest.UNSENT; + set withCredentials(bool value) => credentials = value ? 'include' : 'omit'; + set responseType(String unused) {} + + // Streams and controllers + final onReadyStateChangeController = StreamController.broadcast(); + Stream get onReadyStateChange => onReadyStateChangeController.stream; + final onProgressController = StreamController.broadcast(); + Stream get onProgress => onProgressController.stream; + final onErrorController = StreamController.broadcast(); + Stream get onError => onErrorController.stream; + + // Response information + AbortController? _abortController; + CancelableOperation? _cancelableSend; + dynamic _response; + Uint8List? _lastResponse; + String? get response => responseText; + int get status => + _response != null ? js_util.getProperty(_response, 'status') : 0; + Map get responseHeaders => _response != null + ? toDartMap(js_util.getProperty(_response, 'headers')) + : {}; + String? get responseText => _lastResponse != null + ? utf8.decode(_lastResponse!, allowMalformed: true) + : null; + dynamic get body => + _response != null ? js_util.getProperty(_response, 'body') : null; + + static Map toDartMap(Headers obj) => + Map.fromIterable(getObjectKeys(obj), + value: (key) => js_util.callMethod(obj, 'get', [key]).toString()); + + static List getObjectKeys(Headers obj) { + final keys = js_util.callMethod(obj, 'keys', []); + // This used to work prior to flutter 3.0 now we type check to see if supported + if (keys is Iterable) { + return List.from(keys); + } + + // Otherwise we have to fall back and manually iterate through the javascript iterator + final res = List.empty(growable: true); + while (true) { + final next = js_util.callMethod(keys, 'next', []); + if (js_util.getProperty(next, 'done')) { + break; + } + res.add(js_util.getProperty(next, 'value').toString()); + } + return res; + } + + Future send([List? data]) async { + final doSend = _doSend(data); + _cancelableSend = CancelableOperation.fromFuture(doSend); + await doSend; + } + + Future _doSend([List? data]) async { + final wgs = WorkerGlobalScope.instance; + _setReadyState(HttpRequest.LOADING); + + _abortController = AbortController(); + final init = RequestInit( + cache: cache, + credentials: credentials, + integrity: integrity, + keepalive: keepAlive, + method: method, + mode: mode, + redirect: redirect, + referrerPolicy: referrerPolicy, + signal: _abortController?.signal, + body: data, + headers: js_util.jsify(headers)); + + _response = await js_util + .promiseToFuture(js_util.callMethod(wgs, 'fetch', [uri, init])) + .onError((error, stackTrace) => null, + test: (error) => _abortController?.signal.aborted ?? false); + if (_response == null || (_cancelableSend?.isCanceled ?? false)) { + return; + } + + _setReadyState(HttpRequest.HEADERS_RECEIVED); + if (status < 200 || status >= 300) { + onErrorController.add(status); + } + + final stream = body; + final reader = + stream != null ? js_util.callMethod(stream, 'getReader', []) : null; + if (reader == null) { + onErrorController.add(0); + return; + } + + while (true) { + final result = await js_util + .promiseToFuture(js_util.callMethod(reader, 'read', [])) + .onError((error, stackTrace) => null, + test: (error) => _abortController?.signal.aborted ?? false); + if (result == null || (_cancelableSend?.isCanceled ?? false)) { + return; + } + final value = js_util.getProperty(result, 'value'); + if (value != null) { + _lastResponse = value; + onProgressController.add(value as Uint8List); + } + if (js_util.getProperty(result, 'done')) { + _lastResponse ??= Uint8List(0); + _setReadyState(HttpRequest.DONE); + break; + } + } + } + + void _setReadyState(int state) { + readyState = state; + onReadyStateChangeController.add(state); + if (state == HttpRequest.DONE) {} + } + + void open(String method, String uri) { + this.method = method; + this.uri = uri; + _setReadyState(HttpRequest.OPENED); + } + + void abort() async { + _abortController?.abort(); + await _cancelableSend?.cancel(); + close(); + } + + void close() { + onReadyStateChangeController.close(); + onProgressController.close(); + onErrorController.close(); + _response = null; + } + + void setRequestHeader(String name, String value) { + headers[name] = value; + } + + void overrideMimeType(String mimeType) {} +} + +class FetchTransportStream implements GrpcTransportStream { + final FetchHttpRequest _request; + final ErrorHandler _onError; + final Function(FetchTransportStream stream) _onDone; + bool _headersReceived = false; + final StreamController _incomingProcessor = StreamController(); + final StreamController _incomingMessages = StreamController(); + final StreamController> _outgoingMessages = StreamController(); + + @override + Stream get incomingMessages => _incomingMessages.stream; + + @override + StreamSink> get outgoingMessages => _outgoingMessages.sink; + + FetchTransportStream(this._request, + {required ErrorHandler onError, required onDone}) + : _onError = onError, + _onDone = onDone { + _outgoingMessages.stream + .map(frame) + .listen((data) => _request.send(data), cancelOnError: true); + + _request.onReadyStateChange.listen((data) { + if (_incomingProcessor.isClosed) { + return; + } + switch (_request.readyState) { + case HttpRequest.HEADERS_RECEIVED: + _onHeadersReceived(); + break; + case HttpRequest.DONE: + _onRequestDone(); + _close(); + break; + } + }); + + _request.onError.listen((_) { + if (_incomingProcessor.isClosed) { + return; + } + _onError(GrpcError.unavailable('FetchTransportStream connection-error'), + StackTrace.current); + terminate(); + }); + + _request.onProgress.listen((bytes) { + if (_incomingProcessor.isClosed) { + return; + } + _incomingProcessor.add(bytes.buffer); + }); + + _incomingProcessor.stream + .transform(GrpcWebDecoder()) + .transform(grpcDecompressor()) + .listen(_incomingMessages.add, + onError: _onError, onDone: _incomingMessages.close); + } + + bool _validateResponseState() { + try { + validateHttpStatusAndContentType( + _request.status, _request.responseHeaders, + rawResponse: _request.responseText); + return true; + } catch (e, st) { + _onError(e, st); + return false; + } + } + + void _onHeadersReceived() { + _headersReceived = true; + if (!_validateResponseState()) { + return; + } + _incomingMessages.add(GrpcMetadata(_request.responseHeaders)); + } + + void _onRequestDone() { + if (!_headersReceived && !_validateResponseState()) { + return; + } + if (_request.response == null) { + _onError( + GrpcError.unavailable('FetchTransportStream request null response', + null, _request.responseText), + StackTrace.current); + return; + } + } + + void _close() { + _incomingProcessor.close(); + _outgoingMessages.close(); + _onDone(this); + } + + @override + Future terminate() async { + _close(); + _request.abort(); + } +} + +class FetchClientConnection extends ClientConnection { + final Uri uri; + + final _requests = {}; + + FetchClientConnection(this.uri); + + @override + String get authority => uri.authority; + @override + String get scheme => uri.scheme; + + void _initializeRequest( + FetchHttpRequest request, Map metadata) { + for (final header in metadata.keys) { + request.setRequestHeader(header, metadata[header]!); + } + // Overriding the mimetype allows us to stream and parse the data + request.overrideMimeType('text/plain; charset=x-user-defined'); + request.responseType = 'text'; + } + + @visibleForTesting + FetchHttpRequest createRequest() => FetchHttpRequest(); + + @override + GrpcTransportStream makeRequest(String path, Duration? timeout, + Map metadata, ErrorHandler onError, + {CallOptions? callOptions}) { + // gRPC-web headers. + if (_getContentTypeHeader(metadata) == null) { + metadata['Content-Type'] = 'application/grpc-web+proto'; + metadata['X-User-Agent'] = 'grpc-web-dart/0.1'; + metadata['X-Grpc-Web'] = '1'; + } + + var requestUri = uri.resolve(path); + if (callOptions is WebCallOptions && + callOptions.bypassCorsPreflight == true) { + requestUri = cors.moveHttpHeadersToQueryParam(metadata, requestUri); + } + + final request = createRequest(); + request.open('POST', requestUri.toString()); + if (callOptions is WebCallOptions && callOptions.withCredentials == true) { + request.withCredentials = true; + } + // Must set headers after calling open(). + _initializeRequest(request, metadata); + + final transportStream = + FetchTransportStream(request, onError: onError, onDone: _removeStream); + _requests.add(transportStream); + return transportStream; + } + + void _removeStream(FetchTransportStream stream) { + _requests.remove(stream); + } + + @override + Future terminate() async { + for (var request in List.of(_requests)) { + request.terminate(); + } + } + + @override + void dispatchCall(ClientCall call) { + call.onConnectionReady(this); + } + + @override + Future shutdown() async {} +} + +MapEntry? _getContentTypeHeader(Map metadata) { + for (var entry in metadata.entries) { + if (entry.key.toLowerCase() == _contentTypeKey.toLowerCase()) { + return entry; + } + } + return null; +} diff --git a/lib/src/client/web_channel.dart b/lib/src/client/web_channel.dart index f81da14f..1ce9824b 100644 --- a/lib/src/client/web_channel.dart +++ b/lib/src/client/web_channel.dart @@ -15,7 +15,8 @@ import 'channel.dart'; import 'connection.dart'; -import 'transport/xhr_transport.dart'; +import 'transport/fetch_transport.dart'; +//import 'transport/xhr_transport.dart'; /// A channel to a grpc-web endpoint. class GrpcWebClientChannel extends ClientChannelBase { @@ -25,6 +26,7 @@ class GrpcWebClientChannel extends ClientChannelBase { @override ClientConnection createConnection() { - return XhrClientConnection(uri); + //return XhrClientConnection(uri); + return FetchClientConnection(uri); } } diff --git a/test/client_tests/client_fetch_transport_test.dart b/test/client_tests/client_fetch_transport_test.dart new file mode 100644 index 00000000..ccfe907b --- /dev/null +++ b/test/client_tests/client_fetch_transport_test.dart @@ -0,0 +1,430 @@ +// Copyright (c) 2017, the gRPC project authors. Please see the AUTHORS file +// for details. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +@TestOn('browser') + +import 'dart:async'; + +import 'dart:html'; +import 'dart:typed_data'; + +import 'package:async/async.dart'; +import 'package:grpc/src/client/call.dart'; +import 'package:grpc/src/client/transport/fetch_transport.dart'; +import 'package:grpc/src/shared/message.dart'; +import 'package:grpc/src/shared/status.dart'; +import 'package:mockito/mockito.dart'; +import 'package:stream_transform/stream_transform.dart'; + +import 'package:test/test.dart'; + +class MockFetchRequest extends Mock implements FetchHttpRequest { + MockFetchRequest({int? code}) : status = code ?? 200; + // ignore: close_sinks + final readyStateChangeController = StreamController(); + // ignore: close_sinks + final progressController = StreamController(); + // ignore: close_sinks + final errorController = StreamController(); + + @override + Stream get onReadyStateChange => readyStateChangeController.stream; + + @override + Stream get onProgress => progressController.stream; + + @override + Stream get onError => errorController.stream; + + @override + final int status; + + @override + int get readyState => + super.noSuchMethod(Invocation.getter(#readyState), returnValue: -1); + + @override + String? get response => + super.noSuchMethod(Invocation.getter(#response), returnValue: null); + + @override + Map get responseHeaders => + super.noSuchMethod(Invocation.getter(#responseHeaders), + returnValue: {}); +} + +class MockFetchClientConnection extends FetchClientConnection { + MockFetchClientConnection({int? code}) + : _statusCode = code ?? 200, + super(Uri.parse('test:8080')); + + late MockFetchRequest latestRequest; + final int _statusCode; + + @override + FetchHttpRequest createRequest() { + final request = MockFetchRequest(code: _statusCode); + latestRequest = request; + return request; + } +} + +void main() { + test('Make request sends correct headers', () async { + final metadata = { + 'parameter_1': 'value_1', + 'parameter_2': 'value_2' + }; + + final connection = MockFetchClientConnection(); + + connection.makeRequest('path', Duration(seconds: 10), metadata, + (error, _) => fail(error.toString())); + + verify(connection.latestRequest + .setRequestHeader('Content-Type', 'application/grpc-web+proto')); + verify(connection.latestRequest + .setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1')); + verify(connection.latestRequest.setRequestHeader('X-Grpc-Web', '1')); + verify(connection.latestRequest + .overrideMimeType('text/plain; charset=x-user-defined')); + verify(connection.latestRequest.responseType = 'text'); + }); + + test( + 'Make request sends correct headers and path if bypassCorsPreflight=true', + () async { + final metadata = {'header_1': 'value_1', 'header_2': 'value_2'}; + final connection = MockFetchClientConnection(); + + connection.makeRequest('path', Duration(seconds: 10), metadata, + (error, _) => fail(error.toString()), + callOptions: WebCallOptions(bypassCorsPreflight: true)); + + expect(metadata, isEmpty); + verify(connection.latestRequest.open('POST', + 'test:path?%24httpHeaders=header_1%3Avalue_1%0D%0Aheader_2%3Avalue_2%0D%0AContent-Type%3Aapplication%2Fgrpc-web%2Bproto%0D%0AX-User-Agent%3Agrpc-web-dart%2F0.1%0D%0AX-Grpc-Web%3A1%0D%0A')); + verify(connection.latestRequest + .overrideMimeType('text/plain; charset=x-user-defined')); + verify(connection.latestRequest.responseType = 'text'); + }); + + test( + 'Make request sends correct headers if call options already have ' + 'Content-Type header', () async { + final metadata = { + 'header_1': 'value_1', + 'header_2': 'value_2', + 'Content-Type': 'application/json+protobuf' + }; + final connection = MockFetchClientConnection(); + + connection.makeRequest('/path', Duration(seconds: 10), metadata, + (error, _) => fail(error.toString())); + + expect(metadata, { + 'header_1': 'value_1', + 'header_2': 'value_2', + 'Content-Type': 'application/json+protobuf', + }); + }); + + test('Content-Type header case insensitivity', () async { + final metadata = { + 'header_1': 'value_1', + 'CONTENT-TYPE': 'application/json+protobuf' + }; + final connection = MockFetchClientConnection(); + + connection.makeRequest('/path', Duration(seconds: 10), metadata, + (error, _) => fail(error.toString())); + expect(metadata, { + 'header_1': 'value_1', + 'CONTENT-TYPE': 'application/json+protobuf', + }); + + final lowerMetadata = { + 'header_1': 'value_1', + 'content-type': 'application/json+protobuf' + }; + connection.makeRequest('/path', Duration(seconds: 10), lowerMetadata, + (error, _) => fail(error.toString())); + expect(lowerMetadata, { + 'header_1': 'value_1', + 'content-type': 'application/json+protobuf', + }); + }); + + test('Make request sends correct headers path if only withCredentials=true', + () async { + final metadata = {'header_1': 'value_1', 'header_2': 'value_2'}; + final connection = MockFetchClientConnection(); + + connection.makeRequest('path', Duration(seconds: 10), metadata, + (error, _) => fail(error.toString()), + callOptions: WebCallOptions(withCredentials: true)); + + expect(metadata, { + 'header_1': 'value_1', + 'header_2': 'value_2', + 'Content-Type': 'application/grpc-web+proto', + 'X-User-Agent': 'grpc-web-dart/0.1', + 'X-Grpc-Web': '1' + }); + verify(connection.latestRequest + .setRequestHeader('Content-Type', 'application/grpc-web+proto')); + verify(connection.latestRequest + .setRequestHeader('X-User-Agent', 'grpc-web-dart/0.1')); + verify(connection.latestRequest.setRequestHeader('X-Grpc-Web', '1')); + verify(connection.latestRequest.open('POST', 'test:path')); + verify(connection.latestRequest.withCredentials = true); + verify(connection.latestRequest + .overrideMimeType('text/plain; charset=x-user-defined')); + verify(connection.latestRequest.responseType = 'text'); + }); + + test('Sent data converted to stream properly', () async { + final metadata = { + 'parameter_1': 'value_1', + 'parameter_2': 'value_2' + }; + + final connection = MockFetchClientConnection(); + + final stream = connection.makeRequest('path', Duration(seconds: 10), + metadata, (error, _) => fail(error.toString())); + + final data = List.filled(10, 0); + stream.outgoingMessages.add(data); + await stream.terminate(); + + final expectedData = frame(data); + expect(verify(connection.latestRequest.send(captureAny)).captured.single, + expectedData); + }); + + test('Stream handles headers properly', () async { + final responseHeaders = { + 'parameter_1': 'value_1', + 'parameter_2': 'value_2', + 'content-type': 'application/grpc+proto', + }; + + final transport = MockFetchClientConnection(); + + final stream = transport.makeRequest('test_path', Duration(seconds: 10), {}, + (error, _) => fail(error.toString())); + + when(transport.latestRequest.responseHeaders).thenReturn(responseHeaders); + when(transport.latestRequest.response) + .thenReturn(String.fromCharCodes(frame([]))); + + // Set expectation for request readyState and generate two readyStateChange + // events, so that incomingMessages stream completes. + final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE]; + when(transport.latestRequest.readyState) + .thenAnswer((_) => readyStates.removeAt(0)); + transport.latestRequest.readyStateChangeController.add(readyStates.first); + transport.latestRequest.readyStateChangeController.add(readyStates.first); + + // Should be only one metadata message with headers augmented with :status + // field. + final message = await stream.incomingMessages.single as GrpcMetadata; + expect(message.metadata, responseHeaders); + }); + + test('Stream handles trailers properly', () async { + final requestHeaders = { + 'parameter_1': 'value_1', + 'content-type': 'application/grpc+proto', + }; + final responseTrailers = { + 'trailer_1': 'value_1', + 'trailer_2': 'value_2', + }; + + final connection = MockFetchClientConnection(); + + final stream = connection.makeRequest('test_path', Duration(seconds: 10), + requestHeaders, (error, _) => fail(error.toString())); + + final encodedTrailers = frame(responseTrailers.entries + .map((e) => '${e.key}:${e.value}') + .join('\r\n') + .codeUnits); + encodedTrailers[0] = 0x80; // Mark this frame as trailers. + final encodedString = String.fromCharCodes(encodedTrailers); + + when(connection.latestRequest.responseHeaders).thenReturn(requestHeaders); + when(connection.latestRequest.response).thenReturn(encodedString); + + // Set expectation for request readyState and generate events so that + // incomingMessages stream completes. + final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE]; + when(connection.latestRequest.readyState) + .thenAnswer((_) => readyStates.removeAt(0)); + connection.latestRequest.readyStateChangeController.add(readyStates.first); + connection.latestRequest.progressController + .add(Uint8List.fromList(encodedTrailers)); + connection.latestRequest.readyStateChangeController.add(readyStates.first); + + // Should be two metadata messages: headers and trailers. + final messages = + await stream.incomingMessages.whereType().toList(); + expect(messages.length, 2); + expect(messages.first.metadata, requestHeaders); + expect(messages.last.metadata, responseTrailers); + }); + + test('Stream handles empty trailers properly', () async { + final requestHeaders = { + 'content-type': 'application/grpc+proto', + }; + + final connection = MockFetchClientConnection(); + + final stream = connection.makeRequest('test_path', Duration(seconds: 10), + {}, (error, _) => fail(error.toString())); + + final encoded = frame(''.codeUnits); + encoded[0] = 0x80; // Mark this frame as trailers. + final encodedString = String.fromCharCodes(encoded); + + when(connection.latestRequest.responseHeaders).thenReturn(requestHeaders); + when(connection.latestRequest.response).thenReturn(encodedString); + + // Set expectation for request readyState and generate events so that + // incomingMessages stream completes. + final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE]; + when(connection.latestRequest.readyState) + .thenAnswer((_) => readyStates.removeAt(0)); + connection.latestRequest.readyStateChangeController.add(readyStates.first); + connection.latestRequest.progressController + .add(Uint8List.fromList(encoded)); + connection.latestRequest.readyStateChangeController.add(readyStates.first); + + // Should be two metadata messages: headers and trailers. + final messages = + await stream.incomingMessages.whereType().toList(); + expect(messages.length, 2); + expect(messages.first.metadata, requestHeaders); + expect(messages.last.metadata, isEmpty); + }); + + test('Stream deserializes data properly', () async { + final requestHeaders = { + 'parameter_1': 'value_1', + 'parameter_2': 'value_2', + 'content-type': 'application/grpc+proto', + }; + + final connection = MockFetchClientConnection(); + + final stream = connection.makeRequest('test_path', Duration(seconds: 10), + requestHeaders, (error, _) => fail(error.toString())); + final data = List.filled(10, 224); + when(connection.latestRequest.responseHeaders).thenReturn(requestHeaders); + when(connection.latestRequest.response) + .thenReturn(String.fromCharCodes(frame(data))); + + // Set expectation for request readyState and generate events, so that + // incomingMessages stream completes. + final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE]; + when(connection.latestRequest.readyState) + .thenAnswer((_) => readyStates.removeAt(0)); + connection.latestRequest.readyStateChangeController.add(readyStates.first); + connection.latestRequest.progressController + .add(Uint8List.fromList(frame(data))); + connection.latestRequest.readyStateChangeController.add(readyStates.first); + + // Expect a single data message. + final message = await stream.incomingMessages.whereType().single; + expect(message.data, data); + }); + + test('GrpcError with error details in response', () async { + final connection = MockFetchClientConnection(code: 400); + final errors = []; + // The incoming messages stream never completes when there's an error, so + // using completer. + final errorReceived = Completer(); + connection.makeRequest('test_path', Duration(seconds: 10), {}, (e, _) { + errorReceived.complete(); + errors.add(e as GrpcError); + }); + const errorDetails = 'error details'; + when(connection.latestRequest.responseHeaders) + .thenReturn({'content-type': 'application/grpc+proto'}); + when(connection.latestRequest.readyState).thenReturn(HttpRequest.DONE); + when(connection.latestRequest.responseText).thenReturn(errorDetails); + connection.latestRequest.readyStateChangeController.add(HttpRequest.DONE); + await errorReceived; + expect(errors.single.rawResponse, errorDetails); + }); + + test('Stream receives multiple messages', () async { + final metadata = { + 'parameter_1': 'value_1', + 'parameter_2': 'value_2', + 'content-type': 'application/grpc+proto', + }; + + final connection = MockFetchClientConnection(); + + final stream = connection.makeRequest('test_path', Duration(seconds: 10), + metadata, (error, _) => fail(error.toString())); + + final data = >[ + List.filled(10, 224), + List.filled(5, 124) + ]; + final encodedStrings = + data.map((d) => String.fromCharCodes(frame(d))).toList(); + + when(connection.latestRequest.responseHeaders).thenReturn(metadata); + when(connection.latestRequest.readyState) + .thenReturn(HttpRequest.HEADERS_RECEIVED); + + // At first invocation the response should be the the first message, after + // that first + last messages. + var first = true; + when(connection.latestRequest.response).thenAnswer((_) { + if (first) { + first = false; + return encodedStrings[0]; + } + return encodedStrings[0] + encodedStrings[1]; + }); + + final readyStates = [HttpRequest.HEADERS_RECEIVED, HttpRequest.DONE]; + when(connection.latestRequest.readyState) + .thenAnswer((_) => readyStates.removeAt(0)); + + final queue = StreamQueue(stream.incomingMessages); + // Headers. + connection.latestRequest.readyStateChangeController.add(readyStates.first); + expect(((await queue.next) as GrpcMetadata).metadata, metadata); + // Data 1. + connection.latestRequest.progressController + .add(Uint8List.fromList(frame(data[0]))); + expect(((await queue.next) as GrpcData).data, data[0]); + // Data 2. + connection.latestRequest.progressController + .add(Uint8List.fromList(frame(data[1]))); + expect(((await queue.next) as GrpcData).data, data[1]); + // Done. + connection.latestRequest.readyStateChangeController.add(readyStates.first); + expect(await queue.hasNext, isFalse); + }); +}