Skip to content

- #2661

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed

- #2661

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ All notable changes to this project will be documented in this file.
* Provides `Infallible` versions of `combineLatest` without `resultSelector` requirement.
* Provides `Infallible` versions of `CombineLatest+Collection` helpers.
* Explicitly declare `APPLICATION_EXTENSION_API_ONLY` for CocoaPods
* Ensure `AsyncSequence.asObservable()` runs on background thread using `Task.detached`.

## 6.5.0

Expand Down
2 changes: 1 addition & 1 deletion RxSwift/Observable+Concurrency.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public extension AsyncSequence {
/// - returns: An `Observable` of the async sequence's type
func asObservable() -> Observable<Element> {
Observable.create { observer in
let task = Task {
let task = Task.detached {
do {
for try await value in self {
observer.onNext(value)
Expand Down
106 changes: 106 additions & 0 deletions Tests/RxSwiftTests/Observable+ConcurrencyTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -90,5 +90,111 @@ extension ObservableConcurrencyTests {
task.cancel()
}

func testAsyncSequenceToObservableRunsOnBackgroundThread() async throws {

let asyncSequence = AsyncStream<Int> { continuation in
for i in 1...5 {
continuation.yield(i)
}
continuation.finish()
}

let expectation = XCTestExpectation(description: "Observable completes")

DispatchQueue.main.async {
let observable = asyncSequence.asObservable()

var threadIsNotMain = false
var values = [Int]()

_ = observable.subscribe(
onNext: { value in
values.append(value)
threadIsNotMain = !Thread.isMainThread
},
onCompleted: {
XCTAssertEqual(values, [1, 2, 3, 4, 5])
XCTAssertTrue(threadIsNotMain, "AsyncSequence.asObservable should not run on main thread")
expectation.fulfill()
}
)
}

await fulfillment(of: [expectation], timeout: 5.0)
}

func testAsyncSequenceToObservableWithSleep() async throws {
let asyncSequence = AsyncStream<Int> { continuation in
Task {
for i in 1...3 {
try? await Task.sleep(nanoseconds: 100_000_000)
continuation.yield(i)
}
continuation.finish()
}
}

let expectation = XCTestExpectation(description: "Observable with sleep completes")

DispatchQueue.main.async {
let startTime = Date()
var values = [Int]()
var executionThreads = Set<String>()

_ = asyncSequence.asObservable().subscribe(
onNext: { value in
values.append(value)
let threadName = Thread.current.description
executionThreads.insert(threadName)
},
onCompleted: {
let duration = Date().timeIntervalSince(startTime)
XCTAssertGreaterThanOrEqual(duration, 0.3)
XCTAssertEqual(values, [1, 2, 3])
XCTAssertFalse(executionThreads.contains(where: { $0.contains("main") }))

expectation.fulfill()
}
)
}

await fulfillment(of: [expectation], timeout: 5.0)
}

func testAsyncSequenceToObservableWithError() async throws {
struct TestError: Error {}

let asyncSequence = AsyncThrowingStream<Int, Error> { continuation in
for i in 1...3 {
continuation.yield(i)
}
continuation.finish(throwing: TestError())
}

let expectation = XCTestExpectation(description: "Observable with error completes")

var values = [Int]()
var receivedError: Error?
var threadIsNotMain = false

DispatchQueue.main.async {
_ = asyncSequence.asObservable().subscribe(
onNext: { value in
values.append(value)
threadIsNotMain = !Thread.isMainThread
},
onError: { error in
receivedError = error
XCTAssertEqual(values, [1, 2, 3])
XCTAssertTrue(threadIsNotMain, "Error handler should not run on main thread")
expectation.fulfill()
}
)
}

await fulfillment(of: [expectation], timeout: 5.0)
XCTAssertTrue(receivedError is TestError)
}

}
#endif