Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 8c97c8c

Browse files
author
Thibault Wittemberg
committedSep 26, 2021
operators: add fromAsync, fromThrowingAsync, fromAsyncSequence to bridge async/await with Combine
1 parent fc3e405 commit 8c97c8c

File tree

5 files changed

+609
-24
lines changed

5 files changed

+609
-24
lines changed
 

‎.github/workflows/tests.yml

+13-23
Original file line numberDiff line numberDiff line change
@@ -5,32 +5,31 @@ on: [push, pull_request, workflow_dispatch]
55
jobs:
66
xcode-tests:
77
name: "Test"
8-
runs-on: macOS-latest
8+
runs-on: macOS-11
99

1010
strategy:
1111
matrix:
12-
platform: [macOS, iOS, tvOS]
12+
# platform: [macOS, iOS, tvOS]
13+
platform: [iOS, tvOS]
1314
include:
14-
- platform: macOS
15-
sdk: macosx
16-
destination: "arch=x86_64"
17-
15+
# - platform: macOS
16+
# sdk: macosx11.3
17+
# destination: "arch=x86_64"
18+
#
1819
- platform: iOS
19-
sdk: iphonesimulator
20-
destination: "name=iPhone 11"
20+
sdk: iphoneos15.0
21+
destination: "name=iPhone 13"
2122

2223
- platform: tvOS
23-
sdk: appletvsimulator
24+
sdk: appletvsimulator15.0
2425
destination: "name=Apple TV"
2526

2627
steps:
2728
- uses: actions/checkout@v2
28-
- name: Select Xcode 12 (beta)
29-
run: sudo xcode-select -s /Applications/Xcode_12_beta.app
30-
- name: Generate project
31-
run: make project
29+
- name: Select Xcode 13
30+
run: sudo xcode-select -s /Applications/Xcode_13.0.app
3231
- name: Run tests
33-
run: set -o pipefail && xcodebuild -project CombineExt.xcodeproj -scheme CombineExt-Package -enableCodeCoverage YES -sdk ${{ matrix.sdk }} -destination "${{ matrix.destination }}" test | xcpretty -c -r html --output logs/${{ matrix.platform }}.html
32+
run: set -o pipefail && xcodebuild -scheme CombineExt-Package -enableCodeCoverage YES -sdk ${{ matrix.sdk }} -destination "${{ matrix.destination }}" test | xcpretty -c -r html --output logs/${{ matrix.platform }}.html
3433
- uses: codecov/codecov-action@v1.0.13
3534
with:
3635
token: 1519d58c-6fb9-483f-af6c-7f6f0b384345
@@ -39,12 +38,3 @@ jobs:
3938
with:
4039
name: build-logs-${{ github.run_id }}
4140
path: logs
42-
43-
SPM:
44-
name: "Test (SPM)"
45-
runs-on: macOS-latest
46-
47-
steps:
48-
- uses: actions/checkout@v2
49-
- name: Run tests
50-
run: set -o pipefail && swift test

‎CombineExt.podspec

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@ Pod::Spec.new do |s|
1818
s.source = { :git => "https://github.com/CombineCommunity/CombineExt.git", :tag => s.version }
1919
s.source_files = 'Sources/**/*.swift'
2020
s.frameworks = ['Combine']
21-
s.swift_version = '5.1'
21+
s.swift_version = '5.5'
2222
end

‎README.md

+95
Original file line numberDiff line numberDiff line change
@@ -755,6 +755,101 @@ subscription = ints
755755
.finished
756756
```
757757

758+
------
759+
760+
### fromAsync(priority:_:)
761+
762+
Creates a Combine Publisher from an async function. The Publisher emits a value and then completes when the async function returns its result.
763+
The task that supports the async function is canceled when the publisher's subscription is canceled.
764+
An optional priority indicates the priority of the Task supporting the execution of the async function.
765+
766+
```swift
767+
var value: Int {
768+
get async {
769+
3
770+
}
771+
}
772+
773+
Publishers
774+
.fromAsync {
775+
await value
776+
}.sink {
777+
print($0)
778+
} receiveValue: {
779+
print($0)
780+
}
781+
```
782+
783+
#### Output:
784+
785+
```none
786+
3
787+
finished
788+
```
789+
790+
------
791+
792+
### fromThrowingAsync(priority:_:)
793+
794+
Creates a Combine Publisher from a throwing async function
795+
The Publisher emits a value and completes or fail according the the async function execution result.
796+
The task that supports the async function is canceled when the publisher's subscription is canceled.
797+
An optional priority indicates the priority of the Task supporting the execution of the async function.
798+
799+
```swift
800+
struct MyError: Error, CustomStringConvertible {
801+
var description: String {
802+
"Async Error"
803+
}
804+
}
805+
806+
Publishers
807+
.fromAsync { () async throws -> String in
808+
throw MyError()
809+
}.sink {
810+
print($0)
811+
} receiveValue: {
812+
print($0)
813+
}
814+
```
815+
816+
#### Output:
817+
818+
```none
819+
failure(Async Error)
820+
```
821+
822+
### fromAsyncSequence(priority:_:)
823+
824+
Creates a Combine Publisher from an async sequence.
825+
The Publisher emits values or fail according the the async sequence execution result.
826+
An optional priority indicates the priority of the Task supporting the execution of the async sequence.
827+
828+
```swift
829+
let sequence = AsyncStream(Int.self) { continuation in
830+
continuation.yield(1)
831+
continuation.yield(2)
832+
continuation.yield(3)
833+
continuation.finish()
834+
}
835+
836+
Publishers
837+
.fromAsyncSequence(sequence).sink {
838+
print($0)
839+
} receiveValue: {
840+
print($0)
841+
}
842+
```
843+
844+
#### Output:
845+
846+
```none
847+
1
848+
2
849+
3
850+
finished
851+
```
852+
758853
## Publishers
759854

760855
This section outlines some of the custom Combine publishers CombineExt provides

‎Sources/Operators/FromAsync.swift

+191
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
//
2+
// FromAsync.swift
3+
// CombineExt
4+
//
5+
// Created by Thibault Wittemberg on 2021-06-15.
6+
// Copyright © 2021 Combine Community. All rights reserved.
7+
//
8+
9+
#if canImport(Combine)
10+
import Combine
11+
12+
@available(macOS 12.0, iOS 15.0, tvOS 15.0, watchOS 8.0, *)
13+
public extension Publishers {
14+
/// Creates a Combine Publisher from an async function
15+
/// The Publisher emits a value and then completes when the async function returns its result.
16+
/// The task that supports the async function is canceled when the publisher's subscription is canceled.
17+
/// ```
18+
/// var value: Int {
19+
/// get async {
20+
/// 3
21+
/// }
22+
/// }
23+
///
24+
/// Publishers
25+
/// .fromAsync {
26+
/// await value
27+
/// }.sink {
28+
/// print($0)
29+
/// } receiveValue: {
30+
/// print($0)
31+
/// }
32+
///
33+
/// // will print:
34+
/// // 3
35+
/// // finished
36+
/// ```
37+
/// - parameter priority: Optional value indicating the priority of the Task supporting the execution of the async function
38+
/// - Returns: The Combine Publisher wrapping the async function execution
39+
static func fromAsync<Output>(priority: TaskPriority? = nil,
40+
_ asyncFunction: @escaping () async -> Output) -> AnyPublisher<Output, Never> {
41+
AnyPublisher<Output, Never>.create { subscriber in
42+
let task = Task(priority: priority) {
43+
let result = await asyncFunction()
44+
subscriber.send(result)
45+
subscriber.send(completion: .finished)
46+
}
47+
48+
return AnyCancellable {
49+
task.cancel()
50+
}
51+
}
52+
}
53+
54+
/// Creates a Combine Publisher from a throwing async function
55+
/// The Publisher emits a value or fail according the the async function execution result.
56+
/// The task that supports the async function is canceled when the publisher's subscription is canceled.
57+
///
58+
/// ```
59+
/// var value: Int {
60+
/// get async {
61+
/// 3
62+
/// }
63+
/// }
64+
///
65+
/// Publishers
66+
/// .fromAsync {
67+
/// await value
68+
/// }.sink {
69+
/// print($0)
70+
/// } receiveValue: {
71+
/// print($0)
72+
/// }
73+
///
74+
/// // will print:
75+
/// // 3
76+
/// // finished
77+
/// ```
78+
///
79+
/// Whenever the async function throws an error, the stream will faile:
80+
///
81+
/// ```
82+
/// struct MyError: Error, CustomStringConvertible {
83+
/// var description: String {
84+
/// "Async Error"
85+
/// }
86+
/// }
87+
///
88+
/// Publishers
89+
/// .fromAsync { () async throws -> String in
90+
/// throw MyError()
91+
/// }.sink {
92+
/// print($0)
93+
/// } receiveValue: {
94+
/// print($0)
95+
/// }
96+
///
97+
/// // will print:
98+
/// // failure(Async Error)
99+
///```
100+
/// - parameter priority: Optional value indicating the priority of the Task supporting the execution of the async function
101+
/// - Returns: The Combine Publisher wrapping the async function execution
102+
static func fromThrowingAsync<Output>(priority: TaskPriority? = nil,
103+
_ asyncThrowingFunction: @escaping () async throws -> Output) -> AnyPublisher<Output, Error> {
104+
AnyPublisher<Output, Error>.create { subscriber in
105+
let task = Task(priority: priority) {
106+
do {
107+
let result = try await asyncThrowingFunction()
108+
subscriber.send(result)
109+
subscriber.send(completion: .finished)
110+
} catch {
111+
subscriber.send(completion: .failure(error))
112+
}
113+
}
114+
115+
return AnyCancellable {
116+
task.cancel()
117+
}
118+
}
119+
}
120+
121+
/// Creates a Combine Publisher from an async sequence.
122+
/// The Publisher emits values or fail according the the async sequence execution result.
123+
///
124+
/// ```
125+
/// let sequence = [1, 2, 3].publisher.values
126+
///
127+
/// Publishers
128+
/// .fromAsyncSequence(sequence).sink {
129+
/// print($0)
130+
/// } receiveValue: {
131+
/// print($0)
132+
/// }
133+
///
134+
/// // will print:
135+
/// // 1
136+
/// // 2
137+
/// // 3
138+
/// // finished
139+
/// ```
140+
///
141+
/// If the asyncSequence faild:
142+
///
143+
/// ```
144+
/// struct MyError: Error, CustomStringConvertible {
145+
/// var description: String {
146+
/// "Async Error"
147+
/// }
148+
/// }
149+
///
150+
/// let sequence = AsyncThrowingStream(Int.self) { continuation in
151+
/// continuation.yield(1)
152+
/// continuation.yield(2)
153+
/// continuation.finish(throwing: MockError(value: Int.random(in: 1...100)))
154+
/// }
155+
///
156+
/// Publishers
157+
/// .fromAsyncSequence(sequence).sink {
158+
/// print($0)
159+
/// } receiveValue: {
160+
/// print($0)
161+
/// }
162+
///
163+
/// // will print:
164+
/// // 1
165+
/// // 2
166+
/// // failure(Async Error)
167+
///```
168+
/// - parameter priority: Optional value indicating the priority of the Task supporting the async sequence execution
169+
/// - Returns: The Combine Publisher wrapping the async sequence iteration
170+
static func fromAsyncSequence<Output, AsyncSequenceType>(priority: TaskPriority? = nil,
171+
_ asyncSequence: AsyncSequenceType) -> AnyPublisher<Output, Error>
172+
where AsyncSequenceType: AsyncSequence, AsyncSequenceType.Element == Output {
173+
AnyPublisher<Output, Error>.create { subscriber in
174+
let task = Task(priority: priority) {
175+
do {
176+
for try await result in asyncSequence {
177+
subscriber.send(result)
178+
}
179+
subscriber.send(completion: .finished)
180+
} catch {
181+
subscriber.send(completion: .failure(error))
182+
}
183+
}
184+
185+
return AnyCancellable {
186+
task.cancel()
187+
}
188+
}
189+
}
190+
}
191+
#endif

0 commit comments

Comments
 (0)
Please sign in to comment.