Skip to content

Commit d3aced9

Browse files
committed
Adds Publishers.ConcatMap
1 parent c3c6607 commit d3aced9

File tree

2 files changed

+148
-4
lines changed

2 files changed

+148
-4
lines changed

Sources/Operators/ConcatMap.swift

+140-2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#if canImport(Combine)
1010
import Combine
11+
import class Foundation.NSRecursiveLock
1112

1213
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
1314
public extension Publisher {
@@ -20,8 +21,145 @@ public extension Publisher {
2021
/// - returns: A publisher emitting the values of all emitted publishers in order.
2122
func concatMap<T, P>(
2223
_ transform: @escaping (Self.Output) -> P
23-
) -> Publishers.FlatMap<P, Self> where T == P.Output, P: Publisher, Self.Failure == P.Failure {
24-
flatMap(maxPublishers: .max(1), transform)
24+
) -> Publishers.ConcatMap<P, Self> where T == P.Output, P: Publisher, Self.Failure == P.Failure {
25+
return Publishers.ConcatMap(upstream: self, transform: transform)
26+
}
27+
}
28+
29+
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
30+
public extension Publishers {
31+
final class ConcatMap<NewPublisher, Upstream>: Publisher where NewPublisher: Publisher, Upstream: Publisher, NewPublisher.Failure == Upstream.Failure {
32+
public typealias Transform = (Upstream.Output) -> NewPublisher
33+
public typealias Output = NewPublisher.Output
34+
public typealias Failure = Upstream.Failure
35+
36+
public let upstream: Upstream
37+
public let transform: Transform
38+
39+
public init(
40+
upstream: Upstream,
41+
transform: @escaping Transform
42+
) {
43+
self.upstream = upstream
44+
self.transform = transform
45+
}
46+
47+
public func receive<S: Subscriber>(subscriber: S)
48+
where Output == S.Input, Failure == S.Failure {
49+
subscriber.receive(
50+
subscription: Subscription(
51+
upstream: upstream,
52+
transform: transform,
53+
downstream: subscriber
54+
)
55+
)
56+
}
57+
}
58+
}
59+
60+
// MARK: - Subscription
61+
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
62+
private extension Publishers.ConcatMap {
63+
final class Subscription<Downstream: Subscriber>: Combine.Subscription where Downstream.Input == Output, Downstream.Failure == Failure {
64+
private var sink: Sink<Downstream>?
65+
66+
init(
67+
upstream: Upstream,
68+
transform: @escaping Transform,
69+
downstream: Downstream
70+
) {
71+
self.sink = Sink(
72+
upstream: upstream,
73+
downstream: downstream,
74+
transform: { transform($0) }
75+
)
76+
}
77+
78+
func request(_ demand: Subscribers.Demand) {
79+
sink?.demand(demand)
80+
}
81+
82+
func cancel() {
83+
sink = nil
84+
}
85+
}
86+
}
87+
88+
// MARK: - Sink
89+
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
90+
private extension Publishers.ConcatMap {
91+
final class Sink<Downstream: Subscriber>: CombineExt.Sink<Upstream, Downstream>
92+
where Downstream.Input == Output, Downstream.Failure == Failure {
93+
private let lock = NSRecursiveLock()
94+
private let transform: Transform
95+
private var activePublisher: NewPublisher?
96+
private var bufferedPublishers: [NewPublisher]
97+
private var cancellables: Set<AnyCancellable>
98+
99+
init(
100+
upstream: Upstream,
101+
downstream: Downstream,
102+
transform: @escaping Transform
103+
) {
104+
self.transform = transform
105+
self.bufferedPublishers = []
106+
self.cancellables = []
107+
super.init(
108+
upstream: upstream,
109+
downstream: downstream,
110+
transformFailure: { $0 }
111+
)
112+
}
113+
114+
override func receive(_ input: Upstream.Output) -> Subscribers.Demand {
115+
let mapped = transform(input)
116+
117+
lock.lock()
118+
if activePublisher == nil {
119+
lock.unlock()
120+
setActivePublisher(mapped)
121+
} else {
122+
lock.unlock()
123+
bufferedPublishers.append(mapped)
124+
}
125+
126+
return .unlimited
127+
}
128+
129+
private func setActivePublisher(_ publisher: NewPublisher) {
130+
lock.lock()
131+
defer { lock.unlock() }
132+
activePublisher = publisher
133+
134+
publisher.sink(
135+
receiveCompletion: { completion in
136+
switch completion {
137+
case .finished:
138+
self.lock.lock()
139+
guard let next = self.bufferedPublishers.first else {
140+
self.lock.unlock()
141+
return
142+
}
143+
self.bufferedPublishers.removeFirst()
144+
self.lock.unlock()
145+
self.setActivePublisher(next)
146+
case .failure(let error):
147+
self.receive(completion: .failure(error))
148+
}
149+
},
150+
receiveValue: { value in
151+
_ = self.buffer.buffer(value: value)
152+
}
153+
)
154+
.store(in: &cancellables)
155+
}
156+
}
157+
}
158+
159+
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
160+
extension Publishers.ConcatMap.Subscription: CustomStringConvertible {
161+
var description: String {
162+
return "ConcatMap.Subscription<\(Downstream.Input.self), \(Downstream.Failure.self)>"
25163
}
26164
}
27165
#endif

Tests/ConcatMapTests.swift

+8-2
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,11 @@ final class ConcatMapTests: XCTestCase {
2626

2727
func test_publishes_values_inOrder() {
2828
var receivedValues = [Int]()
29-
let expectedValues = [1, 2, 3, 4]
29+
let expectedValues = [1, 2, 4, 5, 6]
3030

3131
let firstPublisher = P()
3232
let secondPublisher = P()
33+
let thirdPublisher = P()
3334

3435
let sut = PassthroughSubject<P, TestError>()
3536

@@ -42,15 +43,20 @@ final class ConcatMapTests: XCTestCase {
4243

4344
sut.send(firstPublisher)
4445
sut.send(secondPublisher)
46+
sut.send(thirdPublisher)
4547

4648
firstPublisher.send(1)
4749
firstPublisher.send(2)
50+
// values sent onto the second publisher will be ignored as long as the first publisher hasn't completed
51+
secondPublisher.send(3)
4852
firstPublisher.send(completion: .finished)
4953

50-
secondPublisher.send(3)
5154
secondPublisher.send(4)
55+
secondPublisher.send(5)
5256
secondPublisher.send(completion: .finished)
5357

58+
thirdPublisher.send(6)
59+
5460
XCTAssertEqual(expectedValues, receivedValues)
5561
}
5662

0 commit comments

Comments
 (0)