From d718b1706a00df0ea7333c9258255129bbb2a3aa Mon Sep 17 00:00:00 2001 From: Anders Ha Date: Sun, 19 Apr 2020 22:40:39 +0100 Subject: [PATCH 1/6] Convert any Signal, Producer or Property to a Combine publisher via `publisher()`. --- ReactiveSwift.xcodeproj/project.pbxproj | 10 ++ Sources/CombineInteroperability.swift | 179 ++++++++++++++++++++++++ 2 files changed, 189 insertions(+) create mode 100644 Sources/CombineInteroperability.swift diff --git a/ReactiveSwift.xcodeproj/project.pbxproj b/ReactiveSwift.xcodeproj/project.pbxproj index fdea7d3bf..915d3e495 100644 --- a/ReactiveSwift.xcodeproj/project.pbxproj +++ b/ReactiveSwift.xcodeproj/project.pbxproj @@ -66,6 +66,10 @@ 9A1D067D1D948A2300ACF44C /* UnidirectionalBindingSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */; }; 9A1D067E1D948A2300ACF44C /* UnidirectionalBindingSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */; }; 9A1D067F1D948A2300ACF44C /* UnidirectionalBindingSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */; }; + 9A5865DF244CEF9800AADB58 /* CombineInteroperability.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */; }; + 9A5865E0244CEF9800AADB58 /* CombineInteroperability.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */; }; + 9A5865E1244CEF9800AADB58 /* CombineInteroperability.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */; }; + 9A5865E2244CEF9800AADB58 /* CombineInteroperability.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */; }; 9A67963B1F6056B90058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; }; 9A67963C1F6059420058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; }; 9A67963D1F6059430058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; }; @@ -240,6 +244,7 @@ 9A1A4F981E16961C006F3039 /* ValidatingPropertySpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ValidatingPropertySpec.swift; sourceTree = ""; }; 9A1B824020835EEC00EB7C09 /* ResultExtensions.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ResultExtensions.swift; sourceTree = ""; }; 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = UnidirectionalBindingSpec.swift; sourceTree = ""; }; + 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CombineInteroperability.swift; sourceTree = ""; }; 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = UninhabitedTypeGuards.swift; sourceTree = ""; }; 9A681A9D1E5A241B00B097CF /* DeprecationSpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = DeprecationSpec.swift; sourceTree = ""; }; 9A9100DE1E0E6E620093E346 /* ValidatingProperty.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ValidatingProperty.swift; sourceTree = ""; }; @@ -454,6 +459,7 @@ 9A090C131DA0309E00EE97CA /* Reactive.swift */, D0C312C819EF2A5800984962 /* Scheduler.swift */, C79B647B1CD52E23003F2376 /* EventLogger.swift */, + 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */, D03B4A3919F4C25F009E02AC /* Signals */, D03B4A3B19F4C281009E02AC /* Extensions */, 9ABCB1841D2A5B5A00BCA243 /* Deprecations+Removals.swift */, @@ -869,6 +875,7 @@ 57A4D1B61BA13D7A00F7D4B1 /* Event.swift in Sources */, 57A4D1B81BA13D7A00F7D4B1 /* Scheduler.swift in Sources */, 9A9100E21E0E6E680093E346 /* ValidatingProperty.swift in Sources */, + 9A5865E2244CEF9800AADB58 /* CombineInteroperability.swift in Sources */, 57A4D1B91BA13D7A00F7D4B1 /* Action.swift in Sources */, 57A4D1BA1BA13D7A00F7D4B1 /* Property.swift in Sources */, 9A090C171DA0309E00EE97CA /* Reactive.swift in Sources */, @@ -924,6 +931,7 @@ A9B315BE1B3940810001CB9C /* Event.swift in Sources */, A9B315C01B3940810001CB9C /* Scheduler.swift in Sources */, 9A9100E11E0E6E680093E346 /* ValidatingProperty.swift in Sources */, + 9A5865E1244CEF9800AADB58 /* CombineInteroperability.swift in Sources */, A9B315C11B3940810001CB9C /* Action.swift in Sources */, A9B315C21B3940810001CB9C /* Property.swift in Sources */, 9A090C161DA0309E00EE97CA /* Reactive.swift in Sources */, @@ -952,6 +960,7 @@ D0C312D319EF2A5800984962 /* Disposable.swift in Sources */, 9A9100DF1E0E6E620093E346 /* ValidatingProperty.swift in Sources */, EBCC7DBC1BBF010C00A2AE92 /* Observer.swift in Sources */, + 9A5865DF244CEF9800AADB58 /* CombineInteroperability.swift in Sources */, D03B4A3D19F4C39A009E02AC /* FoundationExtensions.swift in Sources */, 9A090C141DA0309E00EE97CA /* Reactive.swift in Sources */, D08C54B31A69A2AE00AD8286 /* Signal.swift in Sources */, @@ -1007,6 +1016,7 @@ D0C312D419EF2A5800984962 /* Disposable.swift in Sources */, D08C54B91A69A9D100AD8286 /* SignalProducer.swift in Sources */, 9A9100E01E0E6E670093E346 /* ValidatingProperty.swift in Sources */, + 9A5865E0244CEF9800AADB58 /* CombineInteroperability.swift in Sources */, 9ABCB1861D2A5B5A00BCA243 /* Deprecations+Removals.swift in Sources */, EBCC7DBD1BBF01E100A2AE92 /* Observer.swift in Sources */, 9A090C151DA0309E00EE97CA /* Reactive.swift in Sources */, diff --git a/Sources/CombineInteroperability.swift b/Sources/CombineInteroperability.swift new file mode 100644 index 000000000..ca6d17056 --- /dev/null +++ b/Sources/CombineInteroperability.swift @@ -0,0 +1,179 @@ +#if canImport(Combine) +import Combine + +extension SignalProducerConvertible { + @available(macOS 10.15, *) + @available(iOS 13.0, *) + @available(tvOS 13.0, *) + @available(macCatalyst 13.0, *) + @available(watchOS 6.0, *) + public func publisher() -> ProducerPublisher { + ProducerPublisher(base: producer) + } +} + +@available(macOS 10.15, *) +@available(iOS 13.0, *) +@available(tvOS 13.0, *) +@available(macCatalyst 13.0, *) +@available(watchOS 6.0, *) +public struct ProducerPublisher: Publisher { + public let base: SignalProducer + + public init(base: SignalProducer) { + self.base = base + } + + public func receive(subscriber: S) where S : Subscriber, Output == S.Input, Failure == S.Failure { + let subscription = Subscription(subscriber: subscriber, base: base) + subscription.bootstrap() + } + + final class Subscription: Combine.Subscription where Output == S.Input, Failure == S.Failure { + let subscriber: S + let base: SignalProducer + let state: Atomic + + init(subscriber: S, base: SignalProducer) { + self.subscriber = subscriber + self.base = base + self.state = Atomic(State()) + } + + func bootstrap() { + subscriber.receive(subscription: self) + } + + func request(_ incoming: Subscribers.Demand) { + let response: DemandResponse = state.modify { state in + guard state.hasCancelled == false else { + return .noAction + } + + guard state.hasStarted else { + state.hasStarted = true + state.requested = incoming + return .startUpstream + } + + state.requested = state.requested + incoming + let unsatified = state.requested - state.satisfied + + if let max = unsatified.max { + let dequeueCount = Swift.min(state.buffer.count, max) + state.satisfied += dequeueCount + + defer { state.buffer.removeFirst(dequeueCount) } + return .satisfyDemand(Array(state.buffer.prefix(dequeueCount))) + } else { + defer { state.buffer = [] } + return .satisfyDemand(state.buffer) + } + } + + switch response { + case let .satisfyDemand(output): + var demand: Subscribers.Demand = .none + + for output in output { + demand += subscriber.receive(output) + } + + if demand != .none { + request(demand) + } + + case .startUpstream: + let disposable = base.start { [weak self] event in + guard let self = self else { return } + + switch event { + case let .value(output): + let (shouldSendImmediately, isDemandUnlimited): (Bool, Bool) = self.state.modify { state in + guard state.hasCancelled == false else { return (false, false) } + + let unsatified = state.requested - state.satisfied + + if let count = unsatified.max, count >= 1 { + assert(state.buffer.count == 0) + state.satisfied += 1 + return (true, false) + } else if unsatified == .unlimited { + assert(state.buffer.isEmpty) + return (true, true) + } else { + assert(state.requested == state.satisfied) + state.buffer.append(output) + return (false, false) + } + } + + if shouldSendImmediately { + let demand = self.subscriber.receive(output) + + if isDemandUnlimited == false && demand != .none { + self.request(demand) + } + } + + case .completed, .interrupted: + self.cancel() + self.subscriber.receive(completion: .finished) + + case let .failed(error): + self.cancel() + self.subscriber.receive(completion: .failure(error)) + } + } + + let shouldDispose: Bool = state.modify { state in + guard state.hasCancelled == false else { return true } + state.producerSubscription = disposable + return false + } + + if shouldDispose { + disposable.dispose() + } + + case .noAction: + break + } + } + + func cancel() { + let disposable = state.modify { $0.cancel() } + disposable?.dispose() + } + + struct State { + var requested: Subscribers.Demand = .none + var satisfied: Subscribers.Demand = .none + + var buffer: [Output] = [] + + var producerSubscription: Disposable? + var hasStarted = false + var hasCancelled = false + + init() { + producerSubscription = nil + hasStarted = false + hasCancelled = false + } + + mutating func cancel() -> Disposable? { + hasCancelled = true + defer { producerSubscription = nil } + return producerSubscription + } + } + + enum DemandResponse { + case startUpstream + case satisfyDemand([Output]) + case noAction + } + } +} +#endif From 6a0a6814ad926aac50496b43137a8dbeeb3e263d Mon Sep 17 00:00:00 2001 From: Anders Ha Date: Sun, 19 Apr 2020 22:51:36 +0100 Subject: [PATCH 2/6] Convert Combine publishers to producer via `producer()`. A few utilities. --- .gitignore | 1 + ReactiveSwift.xcodeproj/project.pbxproj | 48 +++++++++++++++---- .../CombineInteroperability/FromCombine.swift | 21 ++++++++ .../ToCombine.swift} | 0 .../CombineInteroperability/Utilities.swift | 28 +++++++++++ 5 files changed, 88 insertions(+), 10 deletions(-) create mode 100644 Sources/CombineInteroperability/FromCombine.swift rename Sources/{CombineInteroperability.swift => CombineInteroperability/ToCombine.swift} (100%) create mode 100644 Sources/CombineInteroperability/Utilities.swift diff --git a/.gitignore b/.gitignore index 619d777c2..c9934c0b4 100644 --- a/.gitignore +++ b/.gitignore @@ -23,6 +23,7 @@ PlaygroundUtility.remap # SwiftPM .build Packages +.swiftpm # Carthage Carthage/Build diff --git a/ReactiveSwift.xcodeproj/project.pbxproj b/ReactiveSwift.xcodeproj/project.pbxproj index 915d3e495..50c56ed94 100644 --- a/ReactiveSwift.xcodeproj/project.pbxproj +++ b/ReactiveSwift.xcodeproj/project.pbxproj @@ -66,10 +66,18 @@ 9A1D067D1D948A2300ACF44C /* UnidirectionalBindingSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */; }; 9A1D067E1D948A2300ACF44C /* UnidirectionalBindingSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */; }; 9A1D067F1D948A2300ACF44C /* UnidirectionalBindingSpec.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */; }; - 9A5865DF244CEF9800AADB58 /* CombineInteroperability.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */; }; - 9A5865E0244CEF9800AADB58 /* CombineInteroperability.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */; }; - 9A5865E1244CEF9800AADB58 /* CombineInteroperability.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */; }; - 9A5865E2244CEF9800AADB58 /* CombineInteroperability.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */; }; + 9A5865DF244CEF9800AADB58 /* ToCombine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* ToCombine.swift */; }; + 9A5865E0244CEF9800AADB58 /* ToCombine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* ToCombine.swift */; }; + 9A5865E1244CEF9800AADB58 /* ToCombine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* ToCombine.swift */; }; + 9A5865E2244CEF9800AADB58 /* ToCombine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865DE244CEF9800AADB58 /* ToCombine.swift */; }; + 9A5865E5244CFD4900AADB58 /* FromCombine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865E4244CFD4900AADB58 /* FromCombine.swift */; }; + 9A5865E6244CFD4900AADB58 /* FromCombine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865E4244CFD4900AADB58 /* FromCombine.swift */; }; + 9A5865E7244CFD4900AADB58 /* FromCombine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865E4244CFD4900AADB58 /* FromCombine.swift */; }; + 9A5865E8244CFD4900AADB58 /* FromCombine.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865E4244CFD4900AADB58 /* FromCombine.swift */; }; + 9A5865EA244CFE9000AADB58 /* Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865E9244CFE9000AADB58 /* Utilities.swift */; }; + 9A5865EB244CFE9000AADB58 /* Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865E9244CFE9000AADB58 /* Utilities.swift */; }; + 9A5865EC244CFE9000AADB58 /* Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865E9244CFE9000AADB58 /* Utilities.swift */; }; + 9A5865ED244CFE9000AADB58 /* Utilities.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A5865E9244CFE9000AADB58 /* Utilities.swift */; }; 9A67963B1F6056B90058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; }; 9A67963C1F6059420058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; }; 9A67963D1F6059430058C5B4 /* UninhabitedTypeGuards.swift in Sources */ = {isa = PBXBuildFile; fileRef = 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */; }; @@ -244,7 +252,9 @@ 9A1A4F981E16961C006F3039 /* ValidatingPropertySpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ValidatingPropertySpec.swift; sourceTree = ""; }; 9A1B824020835EEC00EB7C09 /* ResultExtensions.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ResultExtensions.swift; sourceTree = ""; }; 9A1D067C1D948A2200ACF44C /* UnidirectionalBindingSpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = UnidirectionalBindingSpec.swift; sourceTree = ""; }; - 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = CombineInteroperability.swift; sourceTree = ""; }; + 9A5865DE244CEF9800AADB58 /* ToCombine.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ToCombine.swift; sourceTree = ""; }; + 9A5865E4244CFD4900AADB58 /* FromCombine.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = FromCombine.swift; sourceTree = ""; }; + 9A5865E9244CFE9000AADB58 /* Utilities.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = Utilities.swift; sourceTree = ""; }; 9A67963A1F6056B90058C5B4 /* UninhabitedTypeGuards.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = UninhabitedTypeGuards.swift; sourceTree = ""; }; 9A681A9D1E5A241B00B097CF /* DeprecationSpec.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = DeprecationSpec.swift; sourceTree = ""; }; 9A9100DE1E0E6E620093E346 /* ValidatingProperty.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ValidatingProperty.swift; sourceTree = ""; }; @@ -385,6 +395,16 @@ path = tvOS; sourceTree = ""; }; + 9A5865E3244CFD2A00AADB58 /* CombineInteroperability */ = { + isa = PBXGroup; + children = ( + 9A5865DE244CEF9800AADB58 /* ToCombine.swift */, + 9A5865E4244CFD4900AADB58 /* FromCombine.swift */, + 9A5865E9244CFE9000AADB58 /* Utilities.swift */, + ); + path = CombineInteroperability; + sourceTree = ""; + }; A97451321B3A935E00F48E55 /* watchOS */ = { isa = PBXGroup; children = ( @@ -459,7 +479,7 @@ 9A090C131DA0309E00EE97CA /* Reactive.swift */, D0C312C819EF2A5800984962 /* Scheduler.swift */, C79B647B1CD52E23003F2376 /* EventLogger.swift */, - 9A5865DE244CEF9800AADB58 /* CombineInteroperability.swift */, + 9A5865E3244CFD2A00AADB58 /* CombineInteroperability */, D03B4A3919F4C25F009E02AC /* Signals */, D03B4A3B19F4C281009E02AC /* Extensions */, 9ABCB1841D2A5B5A00BCA243 /* Deprecations+Removals.swift */, @@ -875,7 +895,8 @@ 57A4D1B61BA13D7A00F7D4B1 /* Event.swift in Sources */, 57A4D1B81BA13D7A00F7D4B1 /* Scheduler.swift in Sources */, 9A9100E21E0E6E680093E346 /* ValidatingProperty.swift in Sources */, - 9A5865E2244CEF9800AADB58 /* CombineInteroperability.swift in Sources */, + 9A5865E2244CEF9800AADB58 /* ToCombine.swift in Sources */, + 9A5865ED244CFE9000AADB58 /* Utilities.swift in Sources */, 57A4D1B91BA13D7A00F7D4B1 /* Action.swift in Sources */, 57A4D1BA1BA13D7A00F7D4B1 /* Property.swift in Sources */, 9A090C171DA0309E00EE97CA /* Reactive.swift in Sources */, @@ -888,6 +909,7 @@ 57A4D1C01BA13D7A00F7D4B1 /* FoundationExtensions.swift in Sources */, D85C652D1C0E70E5005A77AD /* Flatten.swift in Sources */, 9ABCB1881D2A5B5A00BCA243 /* Deprecations+Removals.swift in Sources */, + 9A5865E8244CFD4900AADB58 /* FromCombine.swift in Sources */, EBCC7DBF1BBF01E200A2AE92 /* Observer.swift in Sources */, C79B64801CD52E4E003F2376 /* EventLogger.swift in Sources */, 4A0E11021D2A92720065D310 /* Lifetime.swift in Sources */, @@ -931,7 +953,8 @@ A9B315BE1B3940810001CB9C /* Event.swift in Sources */, A9B315C01B3940810001CB9C /* Scheduler.swift in Sources */, 9A9100E11E0E6E680093E346 /* ValidatingProperty.swift in Sources */, - 9A5865E1244CEF9800AADB58 /* CombineInteroperability.swift in Sources */, + 9A5865E1244CEF9800AADB58 /* ToCombine.swift in Sources */, + 9A5865EC244CFE9000AADB58 /* Utilities.swift in Sources */, A9B315C11B3940810001CB9C /* Action.swift in Sources */, A9B315C21B3940810001CB9C /* Property.swift in Sources */, 9A090C161DA0309E00EE97CA /* Reactive.swift in Sources */, @@ -944,6 +967,7 @@ A9B315C81B3940810001CB9C /* FoundationExtensions.swift in Sources */, D85C652C1C0E70E4005A77AD /* Flatten.swift in Sources */, 9ABCB1871D2A5B5A00BCA243 /* Deprecations+Removals.swift in Sources */, + 9A5865E7244CFD4900AADB58 /* FromCombine.swift in Sources */, EBCC7DBE1BBF01E200A2AE92 /* Observer.swift in Sources */, C79B647F1CD52E4D003F2376 /* EventLogger.swift in Sources */, 4A0E11011D2A92720065D310 /* Lifetime.swift in Sources */, @@ -960,7 +984,8 @@ D0C312D319EF2A5800984962 /* Disposable.swift in Sources */, 9A9100DF1E0E6E620093E346 /* ValidatingProperty.swift in Sources */, EBCC7DBC1BBF010C00A2AE92 /* Observer.swift in Sources */, - 9A5865DF244CEF9800AADB58 /* CombineInteroperability.swift in Sources */, + 9A5865DF244CEF9800AADB58 /* ToCombine.swift in Sources */, + 9A5865EA244CFE9000AADB58 /* Utilities.swift in Sources */, D03B4A3D19F4C39A009E02AC /* FoundationExtensions.swift in Sources */, 9A090C141DA0309E00EE97CA /* Reactive.swift in Sources */, D08C54B31A69A2AE00AD8286 /* Signal.swift in Sources */, @@ -973,6 +998,7 @@ D0C312CD19EF2A5800984962 /* Atomic.swift in Sources */, D08C54BA1A69C54300AD8286 /* Property.swift in Sources */, D0D11AB91A6AE87700C1F8B1 /* Action.swift in Sources */, + 9A5865E5244CFD4900AADB58 /* FromCombine.swift in Sources */, C79B647C1CD52E23003F2376 /* EventLogger.swift in Sources */, 9ABCB1851D2A5B5A00BCA243 /* Deprecations+Removals.swift in Sources */, D08C54B81A69A9D000AD8286 /* SignalProducer.swift in Sources */, @@ -1016,7 +1042,8 @@ D0C312D419EF2A5800984962 /* Disposable.swift in Sources */, D08C54B91A69A9D100AD8286 /* SignalProducer.swift in Sources */, 9A9100E01E0E6E670093E346 /* ValidatingProperty.swift in Sources */, - 9A5865E0244CEF9800AADB58 /* CombineInteroperability.swift in Sources */, + 9A5865E0244CEF9800AADB58 /* ToCombine.swift in Sources */, + 9A5865EB244CFE9000AADB58 /* Utilities.swift in Sources */, 9ABCB1861D2A5B5A00BCA243 /* Deprecations+Removals.swift in Sources */, EBCC7DBD1BBF01E100A2AE92 /* Observer.swift in Sources */, 9A090C151DA0309E00EE97CA /* Reactive.swift in Sources */, @@ -1029,6 +1056,7 @@ D08C54B71A69A3DB00AD8286 /* Event.swift in Sources */, C79B647D1CD52E4A003F2376 /* EventLogger.swift in Sources */, D0C312CE19EF2A5800984962 /* Atomic.swift in Sources */, + 9A5865E6244CFD4900AADB58 /* FromCombine.swift in Sources */, D0C312E819EF2A5800984962 /* Scheduler.swift in Sources */, D0C312D019EF2A5800984962 /* Bag.swift in Sources */, D0D11ABA1A6AE87700C1F8B1 /* Action.swift in Sources */, diff --git a/Sources/CombineInteroperability/FromCombine.swift b/Sources/CombineInteroperability/FromCombine.swift new file mode 100644 index 000000000..c2c02f297 --- /dev/null +++ b/Sources/CombineInteroperability/FromCombine.swift @@ -0,0 +1,21 @@ +#if canImport(Combine) +import Combine + +extension Publisher { + public func producer() -> SignalProducer { + return SignalProducer { observer, lifetime in + lifetime += self.sink( + receiveCompletion: { completion in + switch completion { + case let .failure(error): + observer.send(error: error) + case .finished: + observer.sendCompleted() + } + }, + receiveValue: observer.send(value:) + ) + } + } +} +#endif diff --git a/Sources/CombineInteroperability.swift b/Sources/CombineInteroperability/ToCombine.swift similarity index 100% rename from Sources/CombineInteroperability.swift rename to Sources/CombineInteroperability/ToCombine.swift diff --git a/Sources/CombineInteroperability/Utilities.swift b/Sources/CombineInteroperability/Utilities.swift new file mode 100644 index 000000000..45a9782ca --- /dev/null +++ b/Sources/CombineInteroperability/Utilities.swift @@ -0,0 +1,28 @@ +#if canImport(Combine) +import Combine + +extension Lifetime { + @discardableResult + public static func += (lhs: Lifetime, rhs: C) -> Disposable? { + lhs.observeEnded(rhs.cancel) + } +} + +extension AnyDisposable: Cancellable { + public func cancel() { + dispose() + } +} + +extension SerialDisposable: Cancellable { + public func cancel() { + dispose() + } +} + +extension CompositeDisposable: Cancellable { + public func cancel() { + dispose() + } +} +#endif From 4cffe30999f8e08df9577f7ffe9183458ec43478 Mon Sep 17 00:00:00 2001 From: Anders Ha Date: Sun, 19 Apr 2020 23:04:44 +0100 Subject: [PATCH 3/6] Update the availability annotations. --- Sources/CombineInteroperability/FromCombine.swift | 1 + Sources/CombineInteroperability/ToCombine.swift | 12 ++---------- Sources/CombineInteroperability/Utilities.swift | 4 ++++ 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/Sources/CombineInteroperability/FromCombine.swift b/Sources/CombineInteroperability/FromCombine.swift index c2c02f297..ecafc598c 100644 --- a/Sources/CombineInteroperability/FromCombine.swift +++ b/Sources/CombineInteroperability/FromCombine.swift @@ -1,6 +1,7 @@ #if canImport(Combine) import Combine +@available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *) extension Publisher { public func producer() -> SignalProducer { return SignalProducer { observer, lifetime in diff --git a/Sources/CombineInteroperability/ToCombine.swift b/Sources/CombineInteroperability/ToCombine.swift index ca6d17056..f14f37adf 100644 --- a/Sources/CombineInteroperability/ToCombine.swift +++ b/Sources/CombineInteroperability/ToCombine.swift @@ -2,21 +2,13 @@ import Combine extension SignalProducerConvertible { - @available(macOS 10.15, *) - @available(iOS 13.0, *) - @available(tvOS 13.0, *) - @available(macCatalyst 13.0, *) - @available(watchOS 6.0, *) + @available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *) public func publisher() -> ProducerPublisher { ProducerPublisher(base: producer) } } -@available(macOS 10.15, *) -@available(iOS 13.0, *) -@available(tvOS 13.0, *) -@available(macCatalyst 13.0, *) -@available(watchOS 6.0, *) +@available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *) public struct ProducerPublisher: Publisher { public let base: SignalProducer diff --git a/Sources/CombineInteroperability/Utilities.swift b/Sources/CombineInteroperability/Utilities.swift index 45a9782ca..2ffd46440 100644 --- a/Sources/CombineInteroperability/Utilities.swift +++ b/Sources/CombineInteroperability/Utilities.swift @@ -2,24 +2,28 @@ import Combine extension Lifetime { + @available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *) @discardableResult public static func += (lhs: Lifetime, rhs: C) -> Disposable? { lhs.observeEnded(rhs.cancel) } } +@available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *) extension AnyDisposable: Cancellable { public func cancel() { dispose() } } +@available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *) extension SerialDisposable: Cancellable { public func cancel() { dispose() } } +@available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *) extension CompositeDisposable: Cancellable { public func cancel() { dispose() From d7b1a5d7d3b8982bffe3f2043157f4def68fe385 Mon Sep 17 00:00:00 2001 From: Anders Ha Date: Sun, 26 Apr 2020 21:37:02 +0100 Subject: [PATCH 4/6] Remove `Cancellable` conformances. --- .../CombineInteroperability/Utilities.swift | 26 +++---------------- 1 file changed, 3 insertions(+), 23 deletions(-) diff --git a/Sources/CombineInteroperability/Utilities.swift b/Sources/CombineInteroperability/Utilities.swift index 2ffd46440..8bd707b4f 100644 --- a/Sources/CombineInteroperability/Utilities.swift +++ b/Sources/CombineInteroperability/Utilities.swift @@ -4,29 +4,9 @@ import Combine extension Lifetime { @available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *) @discardableResult - public static func += (lhs: Lifetime, rhs: C) -> Disposable? { - lhs.observeEnded(rhs.cancel) - } -} - -@available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *) -extension AnyDisposable: Cancellable { - public func cancel() { - dispose() - } -} - -@available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *) -extension SerialDisposable: Cancellable { - public func cancel() { - dispose() - } -} - -@available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *) -extension CompositeDisposable: Cancellable { - public func cancel() { - dispose() + public static func += (lhs: Lifetime, rhs: C?) -> Disposable? { + rhs.flatMap { lhs.observeEnded($0.cancel) } } } #endif + From d47129e0374f6131b75950f44b05e2251f877c82 Mon Sep 17 00:00:00 2001 From: Anders Ha Date: Mon, 27 Apr 2020 10:47:00 +0100 Subject: [PATCH 5/6] Add `eraseToAnyPublisher()`. --- Sources/CombineInteroperability/ToCombine.swift | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Sources/CombineInteroperability/ToCombine.swift b/Sources/CombineInteroperability/ToCombine.swift index f14f37adf..26891caac 100644 --- a/Sources/CombineInteroperability/ToCombine.swift +++ b/Sources/CombineInteroperability/ToCombine.swift @@ -2,6 +2,11 @@ import Combine extension SignalProducerConvertible { + @available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *) + public func eraseToAnyPublisher() -> AnyPublisher { + publisher().eraseToAnyPublisher() + } + @available(macOS 10.15, iOS 13.0, tvOS 13.0, macCatalyst 13.0, watchOS 6.0, *) public func publisher() -> ProducerPublisher { ProducerPublisher(base: producer) From dcbf57226dd2d00c5deac3d32370603774c09d9a Mon Sep 17 00:00:00 2001 From: Anders Ha Date: Mon, 27 Apr 2020 10:49:48 +0100 Subject: [PATCH 6/6] Remove one level of indentation. --- .../CombineInteroperability/ToCombine.swift | 231 +++++++++--------- 1 file changed, 117 insertions(+), 114 deletions(-) diff --git a/Sources/CombineInteroperability/ToCombine.swift b/Sources/CombineInteroperability/ToCombine.swift index 26891caac..cf4cd6734 100644 --- a/Sources/CombineInteroperability/ToCombine.swift +++ b/Sources/CombineInteroperability/ToCombine.swift @@ -22,155 +22,158 @@ public struct ProducerPublisher: Publisher { } public func receive(subscriber: S) where S : Subscriber, Output == S.Input, Failure == S.Failure { - let subscription = Subscription(subscriber: subscriber, base: base) + let subscription = ProducerSubscription(subscriber: subscriber, base: base) subscription.bootstrap() } +} - final class Subscription: Combine.Subscription where Output == S.Input, Failure == S.Failure { - let subscriber: S - let base: SignalProducer - let state: Atomic +final class ProducerSubscription: Combine.Subscription { + typealias Output = S.Output + typealias Failure = S.Failure + + let subscriber: S + let base: SignalProducer + let state: Atomic - init(subscriber: S, base: SignalProducer) { - self.subscriber = subscriber - self.base = base - self.state = Atomic(State()) - } + init(subscriber: S, base: SignalProducer) { + self.subscriber = subscriber + self.base = base + self.state = Atomic(State()) + } - func bootstrap() { - subscriber.receive(subscription: self) - } + func bootstrap() { + subscriber.receive(subscription: self) + } - func request(_ incoming: Subscribers.Demand) { - let response: DemandResponse = state.modify { state in - guard state.hasCancelled == false else { - return .noAction - } + func request(_ incoming: Subscribers.Demand) { + let response: DemandResponse = state.modify { state in + guard state.hasCancelled == false else { + return .noAction + } - guard state.hasStarted else { - state.hasStarted = true - state.requested = incoming - return .startUpstream - } + guard state.hasStarted else { + state.hasStarted = true + state.requested = incoming + return .startUpstream + } - state.requested = state.requested + incoming - let unsatified = state.requested - state.satisfied + state.requested = state.requested + incoming + let unsatified = state.requested - state.satisfied - if let max = unsatified.max { - let dequeueCount = Swift.min(state.buffer.count, max) - state.satisfied += dequeueCount + if let max = unsatified.max { + let dequeueCount = Swift.min(state.buffer.count, max) + state.satisfied += dequeueCount - defer { state.buffer.removeFirst(dequeueCount) } - return .satisfyDemand(Array(state.buffer.prefix(dequeueCount))) - } else { - defer { state.buffer = [] } - return .satisfyDemand(state.buffer) - } + defer { state.buffer.removeFirst(dequeueCount) } + return .satisfyDemand(Array(state.buffer.prefix(dequeueCount))) + } else { + defer { state.buffer = [] } + return .satisfyDemand(state.buffer) } + } - switch response { - case let .satisfyDemand(output): - var demand: Subscribers.Demand = .none + switch response { + case let .satisfyDemand(output): + var demand: Subscribers.Demand = .none - for output in output { - demand += subscriber.receive(output) - } + for output in output { + demand += subscriber.receive(output) + } - if demand != .none { - request(demand) - } + if demand != .none { + request(demand) + } - case .startUpstream: - let disposable = base.start { [weak self] event in - guard let self = self else { return } - - switch event { - case let .value(output): - let (shouldSendImmediately, isDemandUnlimited): (Bool, Bool) = self.state.modify { state in - guard state.hasCancelled == false else { return (false, false) } - - let unsatified = state.requested - state.satisfied - - if let count = unsatified.max, count >= 1 { - assert(state.buffer.count == 0) - state.satisfied += 1 - return (true, false) - } else if unsatified == .unlimited { - assert(state.buffer.isEmpty) - return (true, true) - } else { - assert(state.requested == state.satisfied) - state.buffer.append(output) - return (false, false) - } + case .startUpstream: + let disposable = base.start { [weak self] event in + guard let self = self else { return } + + switch event { + case let .value(output): + let (shouldSendImmediately, isDemandUnlimited): (Bool, Bool) = self.state.modify { state in + guard state.hasCancelled == false else { return (false, false) } + + let unsatified = state.requested - state.satisfied + + if let count = unsatified.max, count >= 1 { + assert(state.buffer.count == 0) + state.satisfied += 1 + return (true, false) + } else if unsatified == .unlimited { + assert(state.buffer.isEmpty) + return (true, true) + } else { + assert(state.requested == state.satisfied) + state.buffer.append(output) + return (false, false) } + } - if shouldSendImmediately { - let demand = self.subscriber.receive(output) + if shouldSendImmediately { + let demand = self.subscriber.receive(output) - if isDemandUnlimited == false && demand != .none { - self.request(demand) - } + if isDemandUnlimited == false && demand != .none { + self.request(demand) } - - case .completed, .interrupted: - self.cancel() - self.subscriber.receive(completion: .finished) - - case let .failed(error): - self.cancel() - self.subscriber.receive(completion: .failure(error)) } - } - let shouldDispose: Bool = state.modify { state in - guard state.hasCancelled == false else { return true } - state.producerSubscription = disposable - return false - } + case .completed, .interrupted: + self.cancel() + self.subscriber.receive(completion: .finished) - if shouldDispose { - disposable.dispose() + case let .failed(error): + self.cancel() + self.subscriber.receive(completion: .failure(error)) } + } - case .noAction: - break + let shouldDispose: Bool = state.modify { state in + guard state.hasCancelled == false else { return true } + state.producerSubscription = disposable + return false } - } - func cancel() { - let disposable = state.modify { $0.cancel() } - disposable?.dispose() + if shouldDispose { + disposable.dispose() + } + + case .noAction: + break } + } - struct State { - var requested: Subscribers.Demand = .none - var satisfied: Subscribers.Demand = .none + func cancel() { + let disposable = state.modify { $0.cancel() } + disposable?.dispose() + } - var buffer: [Output] = [] + struct State { + var requested: Subscribers.Demand = .none + var satisfied: Subscribers.Demand = .none - var producerSubscription: Disposable? - var hasStarted = false - var hasCancelled = false + var buffer: [Output] = [] - init() { - producerSubscription = nil - hasStarted = false - hasCancelled = false - } + var producerSubscription: Disposable? + var hasStarted = false + var hasCancelled = false - mutating func cancel() -> Disposable? { - hasCancelled = true - defer { producerSubscription = nil } - return producerSubscription - } + init() { + producerSubscription = nil + hasStarted = false + hasCancelled = false } - enum DemandResponse { - case startUpstream - case satisfyDemand([Output]) - case noAction + mutating func cancel() -> Disposable? { + hasCancelled = true + defer { producerSubscription = nil } + return producerSubscription } } + + enum DemandResponse { + case startUpstream + case satisfyDemand([Output]) + case noAction + } } #endif