diff --git a/drift/lib/src/remote/protocol.dart b/drift/lib/src/remote/protocol.dart index d6ddb99fd..f1eacb511 100644 --- a/drift/lib/src/remote/protocol.dart +++ b/drift/lib/src/remote/protocol.dart @@ -54,7 +54,8 @@ class DriftProtocol { } Message deserialize(Object message) { - if (message is! List) throw const FormatException('Cannot read message'); + if (message is! List) + throw FormatException('Cannot read message ${message.runtimeType}'); final tag = message[0]; final id = message[1] as int; diff --git a/drift/lib/web/worker.dart b/drift/lib/web/worker.dart index 510055c2c..a59463284 100644 --- a/drift/lib/web/worker.dart +++ b/drift/lib/web/worker.dart @@ -4,10 +4,15 @@ /// For more details on how to use this library, see [the documentation]. /// /// [the documentation]: https://drift.simonbinder.eu/web/#using-web-workers +// ignore_for_file: public_member_api_docs + library drift.web.workers; import 'dart:async'; import 'dart:html'; +// import 'dart:html'; +import 'dart:js_interop'; +import 'dart:js_interop_unsafe'; import 'package:async/async.dart'; import 'package:drift/drift.dart'; @@ -115,18 +120,7 @@ enum DriftWorkerMode { /// contains additional information and an example on how to use workers with /// Dart and Drift. void driftWorkerMain(QueryExecutor Function() openConnection) { - final self = WorkerGlobalScope.instance; - _RunningDriftWorker worker; - - if (self is SharedWorkerGlobalScope) { - worker = _RunningDriftWorker(true, openConnection); - } else if (self is DedicatedWorkerGlobalScope) { - worker = _RunningDriftWorker(false, openConnection); - } else { - throw StateError('This worker is neither a shared nor a dedicated worker'); - } - - worker.start(); + _RunningDriftWorker(true, openConnection).start(); } /// Spawn or connect to a web worker written with [driftWorkerMain]. @@ -154,10 +148,16 @@ Future connectToDriftWorker(String workerJsUri, worker.postMessage(webChannel.port1, [webChannel.port1]); channel = webChannel.port2.channel(); } else { - final worker = SharedWorker(workerJsUri, 'drift database'); - final port = worker.port!; - - var didGetInitializationResponse = false; + final chrome = + (WorkerGlobalScope.instance as JSObject).getProperty("chrome".toJS); + final runtime = (chrome as JSObject).getProperty("runtime".toJS); + final jsPort = (runtime as JSObject) + .callMethod("connect".toJS, ChromeConnectInfo(name: "drift database")); + final port = jsPort as ChromeRuntimePort; + // final worker = SharedWorker(workerJsUri, 'drift database'); + // final port = worker.port!; + + var didGetInitializationResponse = true; port.postMessage(mode.name); channel = port.channel().transformStream(StreamTransformer.fromHandlers( handleData: (data, sink) { @@ -185,6 +185,92 @@ Future connectToDriftWorker(String workerJsUri, return connectToRemoteAndInitialize(channel); } +extension type ChromeConnectInfo._(JSObject _) implements JSObject { + external String get name; + + external ChromeConnectInfo({required String name}); +} + +@JS() +@staticInterop +class ChromeRuntimePort {} + +extension ChromeRuntimePortX on ChromeRuntimePort { + external void disconnect(); + @JS('postMessage') + external void _postMessage(JSAny? message); + + void postMessage(Object? message) => _postMessage(message.jsify()); + + external void close(); + + external String get name; + + external ChromePortOnMessage get onMessage; +} + +@JS() +@staticInterop +class ChromePortOnMessage {} + +@JS() +@staticInterop +class ChromePortOnDisconnect {} + +extension ChromeRuntimePortOnMessageX on ChromePortOnMessage { + @JS('addListener') + external void _addListener(JSExportedDartFunction listener); + // external void _removeListener(JSExportedDartFunction listener); + + void addListener(ChromePortOnMessageListener listener) => + _addListener(listener.toJS); +} + +typedef ChromePortOnMessageListener = void Function( + JSAny? message, ChromeRuntimePort port); + +const _disconnectMessage = '_disconnect'; + +/// Extension to transform a raw [MessagePort] from web workers into a Dart +/// [StreamChannel]. +extension PortToChannel on ChromeRuntimePort { + /// Converts this port to a two-way communication channel, exposed as a + /// [StreamChannel]. + /// + /// This can be used to implement a remote database connection over service + /// workers. + /// + /// The [explicitClose] parameter can be used to control whether a close + /// message should be sent through the channel when it is closed. This will + /// cause it to be closed on the other end as well. Note that this is not a + /// reliable way of determining channel closures though, as there is no event + /// for channels being closed due to a tab or worker being closed. + /// Both "ends" of a JS channel calling [channel] on their part must use the + /// value for [explicitClose]. + StreamChannel channel({bool explicitClose = false}) { + final controller = StreamChannelController(); + onMessage.addListener((message, port) { + if (explicitClose && message == _disconnectMessage) { + // Other end has closed the connection + controller.local.sink.close(); + } else { + controller.local.sink.add(message.dartify()); + } + }); + + controller.local.stream.listen(postMessage, onDone: () { + // Closed locally, inform the other end. + if (explicitClose) { + postMessage(_disconnectMessage); + } + + close(); + }); + + return controller.foreign; + } +} + class _RunningDriftWorker { final bool isShared; final QueryExecutor Function() connectionFactory; @@ -196,10 +282,22 @@ class _RunningDriftWorker { _RunningDriftWorker(this.isShared, this.connectionFactory); void start() { + print("DOING DRIFT START"); if (isShared) { - const event = EventStreamProvider('connect'); - event.forTarget(self).listen(_newConnection); + final chrome = (self as JSObject).getProperty("chrome".toJS); + final runtime = (chrome as JSObject).getProperty("runtime".toJS); + final onConnect = (runtime as JSObject).getProperty("onConnect".toJS); + (onConnect as JSObject).callMethod( + "addListener".toJS, + ((ChromeRuntimePort port) { + print("CONNECTED"); + _newConnection(port); + }).toJS, + ); + // const event = EventStreamProvider('connect'); + // event.forTarget(self).listen(_newConnection); } else { + print("DRIFT START NOT SHARED"); const event = EventStreamProvider('message'); event.forTarget(self).map((e) => e.data).listen(_handleMessage); } @@ -226,9 +324,8 @@ class _RunningDriftWorker { } /// Handle a new connection, which implies that this worker is shared. - void _newConnection(MessageEvent event) { + void _newConnection(ChromeRuntimePort outgoingPort) { assert(isShared); - final outgoingPort = event.ports.first; // We still don't know whether this shared worker is supposed to host the // server itself or whether this is delegated to a dedicated worker managed diff --git a/drift/pubspec.yaml b/drift/pubspec.yaml index ba3bf91a7..3b6cb51e4 100644 --- a/drift/pubspec.yaml +++ b/drift/pubspec.yaml @@ -27,7 +27,8 @@ dev_dependencies: build_runner_core: ^7.0.0 build_verify: ^3.0.0 build_web_compilers: ^4.0.3 - drift_dev: any + drift_dev: + path: ../drift_dev drift_testcases: path: ../extras/integration_tests/drift_testcases http: ^0.13.4