diff --git a/RxSwift/Observables/Multicast.swift b/RxSwift/Observables/Multicast.swift index 1ebcf0212..2d0df4dc8 100644 --- a/RxSwift/Observables/Multicast.swift +++ b/RxSwift/Observables/Multicast.swift @@ -229,13 +229,15 @@ final private class ConnectableObservableAdapter } private var lazySubject: Subject { - if let subject = self.subject { + lock.performLocked { + if let subject = self.subject { + return subject + } + + let subject = self.makeSubject() + self.subject = subject return subject } - - let subject = self.makeSubject() - self.subject = subject - return subject } override func subscribe(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element { diff --git a/RxSwift/Observables/ShareReplayScope.swift b/RxSwift/Observables/ShareReplayScope.swift index 59d440dc3..8ed1bdb79 100644 --- a/RxSwift/Observables/ShareReplayScope.swift +++ b/RxSwift/Observables/ShareReplayScope.swift @@ -169,7 +169,7 @@ private final class ShareReplay1WhileConnectedConnection private let lock: RecursiveLock private var disposed: Bool = false fileprivate var observers = Observers() - private var element: Element? + fileprivate var element: Element? init(parent: Parent, lock: RecursiveLock) { self.parent = parent @@ -205,18 +205,6 @@ private final class ShareReplay1WhileConnectedConnection self.subscription.setDisposable(self.parent.source.subscribe(self)) } - final func synchronized_subscribe(_ observer: Observer) -> Disposable where Observer.Element == Element { - self.lock.performLocked { - if let element = self.element { - observer.on(.next(element)) - } - - let disposeKey = self.observers.insert(observer.on) - - return SubscriptionDisposable(owner: self, key: disposeKey) - } - } - final private func synchronized_dispose() { self.disposed = true if self.parent.connection === self { @@ -274,14 +262,20 @@ final private class ShareReplay1WhileConnected let connection = self.synchronized_subscribe(observer) let count = connection.observers.count - let disposable = connection.synchronized_subscribe(observer) + let disposeKey = connection.observers.insert(observer.on) + + let initialValueToReplay = connection.element self.lock.unlock() + if let initialValueToReplay { + observer.on(.next(initialValueToReplay)) + } + if count == 0 { connection.connect() } - return disposable + return SubscriptionDisposable(owner: connection, key: disposeKey) } @inline(__always) @@ -414,8 +408,8 @@ final private class ShareWhileConnected let connection = self.synchronized_subscribe(observer) let count = connection.observers.count - let disposable = connection.synchronized_subscribe(observer) self.lock.unlock() + let disposable = connection.synchronized_subscribe(observer) if count == 0 { connection.connect() diff --git a/Tests/RxSwiftTests/Anomalies.swift b/Tests/RxSwiftTests/Anomalies.swift index 4e1cd204e..3f8e81d77 100644 --- a/Tests/RxSwiftTests/Anomalies.swift +++ b/Tests/RxSwiftTests/Anomalies.swift @@ -176,4 +176,88 @@ extension AnomaliesTest { performSharingOperatorsTest(share: op) } } + + func test2653ShareReplayOneInitialEmissionDeadlock() { + let immediatelyEmittingSource = Observable.create { observer in + observer.on(.next(())) + return Disposables.create() + } + .share(replay: 1, scope: .whileConnected) + + let exp = createInitialEmissionsDeadlockExpectation( + sourceName: "`share(replay: 1, scope: .whileConnected)`", + immediatelyEmittingSource: immediatelyEmittingSource + ) + + wait(for: [exp], timeout: 5) + } + + func test2653ShareReplayMoreInitialEmissionDeadlock() { + let immediatelyEmittingSource = Observable.create { observer in + observer.on(.next(())) + return Disposables.create() + } + .share(replay: 2, scope: .whileConnected) + + let exp = createInitialEmissionsDeadlockExpectation( + sourceName: "`share(replay: 2, scope: .whileConnected)`", + immediatelyEmittingSource: immediatelyEmittingSource + ) + + wait(for: [exp], timeout: 5) + } + + func test2653ShareReplayOneForeverInitialEmissionDeadlock() { + let immediatelyEmittingSource = Observable.create { observer in + observer.on(.next(())) + return Disposables.create() + } + .share(replay: 1, scope: .forever) + + let exp = createInitialEmissionsDeadlockExpectation( + sourceName: "`share(replay: 1, scope: .forever)`", + immediatelyEmittingSource: immediatelyEmittingSource + ) + + wait(for: [exp], timeout: 5) + } + + func test2653ShareReplayMoreForeverInitialEmissionDeadlock() { + let immediatelyEmittingSource = Observable.create { observer in + observer.on(.next(())) + return Disposables.create() + } + .share(replay: 2, scope: .forever) + + let exp = createInitialEmissionsDeadlockExpectation( + sourceName: "`share(replay: 2, scope: .forever)`", + immediatelyEmittingSource: immediatelyEmittingSource + ) + + wait(for: [exp], timeout: 5) + } + + private func createInitialEmissionsDeadlockExpectation( + sourceName: String, + immediatelyEmittingSource: Observable + ) -> XCTestExpectation { + let exp = expectation(description: "`\(sourceName)` doesn't cause a deadlock in multithreaded environment because it doesn't keep its lock acquired to replay values upon subscription") + + let triggerRange = 0..<1000 + + let multipleSubscriptions = Observable.zip(triggerRange.map { _ in + Observable.just(()) + .observe(on: ConcurrentDispatchQueueScheduler(qos: .userInitiated)) + .flatMap { _ in + immediatelyEmittingSource + } + .take(1) + }) + + _ = multipleSubscriptions.subscribe(onCompleted: { + exp.fulfill() + }) + + return exp + } }