Skip to content

Commit 1f30938

Browse files
committed
Amb: cancel the publisher that looses the race immediately
1 parent 892b818 commit 1f30938

File tree

2 files changed

+42
-12
lines changed

2 files changed

+42
-12
lines changed

Sources/Operators/Amb.swift

+2
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,10 @@ private extension Publishers.Amb {
103103
guard let decision = decision else { return }
104104
switch decision {
105105
case .first:
106+
secondSink?.cancelUpstream()
106107
secondSink = nil
107108
case .second:
109+
firstSink?.cancelUpstream()
108110
firstSink = nil
109111
}
110112

Tests/AmbTests.swift

+40-12
Original file line numberDiff line numberDiff line change
@@ -48,34 +48,62 @@ class AmbTests: XCTestCase {
4848
}
4949

5050
func testAmbCancelPreSubscription() {
51-
enum CancelError: Swift.Error {
52-
case cancelled
53-
}
54-
var ambPublisher: AnyCancellable?
51+
let ambPublisher: AnyCancellable?
5552

56-
var firstCompletion: Subscribers.Completion<CancelError>?
57-
let subject1 = PassthroughSubject<Int, CancelError>()
53+
let subject1Cancelled = expectation(description: "first publisher cancelled")
54+
let subject1 = PassthroughSubject<Int, Error>()
5855
let subject1Publisher = subject1
5956
.handleEvents(receiveCancel: {
60-
firstCompletion = .failure(CancelError.cancelled)
57+
subject1Cancelled.fulfill()
6158
})
6259
.eraseToAnyPublisher()
6360

64-
var secondCompletion: Subscribers.Completion<CancelError>?
65-
let subject2 = PassthroughSubject<Int, CancelError>()
61+
let subject2Cancelled = expectation(description: "second publisher cancelled")
62+
let subject2 = PassthroughSubject<Int, Error>()
6663
let subject2Publisher = subject2
6764
.handleEvents(receiveCancel: {
68-
secondCompletion = .failure(CancelError.cancelled)
65+
subject2Cancelled.fulfill()
6966
})
7067
.eraseToAnyPublisher()
7168

7269
ambPublisher = Publishers.Amb(first: subject1Publisher, second: subject2Publisher)
7370
.sink(receiveCompletion: { _ in },
7471
receiveValue: { _ in })
72+
73+
// cancelling amb should cancel the inner publishers
7574
ambPublisher?.cancel()
7675

77-
XCTAssertEqual(firstCompletion, .failure(CancelError.cancelled))
78-
XCTAssertEqual(secondCompletion, .failure(CancelError.cancelled))
76+
waitForExpectations(timeout: 0.01)
77+
}
78+
79+
func testAmbCancelPostSubscription() {
80+
let subject1 = PassthroughSubject<Int, Error>()
81+
var subject1cancelCounter = 0
82+
let subject1Publisher = subject1
83+
.handleEvents(receiveCancel: {
84+
subject1cancelCounter += 1
85+
})
86+
.eraseToAnyPublisher()
87+
88+
let subject2 = PassthroughSubject<Int, Error>()
89+
var subject2cancelCounter = 0
90+
let subject2Publisher = subject2
91+
.handleEvents(receiveCancel: {
92+
subject2cancelCounter += 1
93+
})
94+
.eraseToAnyPublisher()
95+
96+
Publishers.Amb(first: subject1Publisher, second: subject2Publisher)
97+
.sink(receiveCompletion: { _ in },
98+
receiveValue: { _ in })
99+
.store(in: &subscriptions)
100+
101+
// subject1 wins the race, so 2 has to be cancelled
102+
subject1.send(1)
103+
104+
// At dealloc both publishes are cancelled, so we cannot use expectations here and count the cancel events instead
105+
XCTAssertEqual(subject1cancelCounter, 0)
106+
XCTAssertEqual(subject2cancelCounter, 1)
79107
}
80108

81109
func testAmbLimitedPreDemand() {

0 commit comments

Comments
 (0)