diff --git a/CHANGELOG.md b/CHANGELOG.md index 811a90b7d..7b0ad4d56 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ All notable changes to this project will be documented in this file. ## Unreleased +* Add convinience `Infallible` operator for setting `SchedulerType`(`subscribe(on:)`, `observe(on:)`). #3340 * Use `AtomicInt` for `BooleanDisposable`s to prevent potential rase condition. #2419 * Renames 'OSX' to 'macOS' in Availability Check. * Renames 'OSXApplicationExtension' to 'macOSApplicationExtension' in Availability Check. diff --git a/RxSwift/Traits/Infallible/Infallible.swift b/RxSwift/Traits/Infallible/Infallible.swift index d5b5826a6..535c1ec3d 100644 --- a/RxSwift/Traits/Infallible/Infallible.swift +++ b/RxSwift/Traits/Infallible/Infallible.swift @@ -19,7 +19,7 @@ public protocol InfallibleType: ObservableConvertibleType {} /// Unlike `SharedSequence`, it does not share its resources or /// replay its events, but acts as a standard `Observable`. public struct Infallible: InfallibleType { - private let source: Observable + let source: Observable init(_ source: Observable) { self.source = source @@ -100,3 +100,42 @@ extension InfallibleType { return self.asObservable().subscribe(eventHandler) } } + + +extension Infallible { + /** + Wraps the source sequence in order to run its subscription and unsubscription logic on the specified + scheduler. + + This operation is not commonly used. + + This only performs the side-effects of subscription and unsubscription on the specified scheduler. + + In order to invoke observer callbacks on a `scheduler`, use `observeOn`. + + - seealso: [subscribeOn operator on reactivex.io](http://reactivex.io/documentation/operators/subscribeon.html) + + - parameter scheduler: Scheduler to perform subscription and unsubscription actions on. + - returns: The source sequence whose subscriptions and unsubscriptions happen on the specified scheduler. + */ + public func subscribe(on scheduler: ImmediateSchedulerType) + -> Infallible { + Infallible(self.source.subscribe(on: scheduler)) + } + + /** + Wraps the source sequence in order to run its observer callbacks on the specified scheduler. + + This only invokes observer callbacks on a `scheduler`. In case the subscription and/or unsubscription + actions have side-effects that require to be run on a scheduler, use `subscribeOn`. + + - seealso: [observeOn operator on reactivex.io](http://reactivex.io/documentation/operators/observeon.html) + + - parameter scheduler: Scheduler to notify observers on. + - returns: The source sequence whose observations happen on the specified scheduler. + */ + public func observe(on scheduler: ImmediateSchedulerType) + -> Infallible { + Infallible(self.source.observe(on: scheduler)) + } +} diff --git a/Tests/RxSwiftTests/Infallible+Tests.swift b/Tests/RxSwiftTests/Infallible+Tests.swift index 69dcb86b4..4c8294a57 100644 --- a/Tests/RxSwiftTests/Infallible+Tests.swift +++ b/Tests/RxSwiftTests/Infallible+Tests.swift @@ -329,6 +329,32 @@ extension InfallibleTest { testObject = nil XCTAssertNil(testObject) } + + func test_observeOn() { + let scheduler = TestScheduler(initialClock: 0) + + let res = scheduler.start { + Infallible.just(1).observe(on:scheduler) + } + + XCTAssertEqual(res.events, [ + .next(201, 1), + .completed(202) + ]) + } + + func test_subscribeOn() { + let scheduler = TestScheduler(initialClock: 0) + + let res = scheduler.start { + Infallible.just(1).subscribe(on: scheduler) + } + + XCTAssertEqual(res.events, [ + .next(201, 1), + .completed(201) + ]) + } } private class TestObject: NSObject {