-
Notifications
You must be signed in to change notification settings - Fork 160
/
Copy pathDemandBuffer.swift
156 lines (138 loc) · 5.27 KB
/
DemandBuffer.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
//
// DemandBuffer.swift
// CombineExt
//
// Created by Shai Mishali on 21/02/2020.
// Copyright © 2020 Combine Community. All rights reserved.
//
#if canImport(Combine)
import Combine
import class Foundation.NSRecursiveLock
/// A buffer responsible for managing the demand of a downstream
/// subscriber for an upstream publisher
///
/// It buffers values and completion events and forwards them dynamically
/// according to the demand requested by the downstream
///
/// In a sense, the subscription only relays the requests for demand, as well
/// the events emitted by the upstream — to this buffer, which manages
/// the entire behavior and backpressure contract
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
class DemandBuffer<S: Subscriber> {
private let lock = NSRecursiveLock()
private var buffer = [S.Input]()
private let subscriber: S
private var completion: Subscribers.Completion<S.Failure>?
private var demandState = Demand()
/// The remaining demand, i.e. the number of values the subscriber still expects
var remainingDemand: Subscribers.Demand {
demandState.requested - demandState.sent
}
/// Initialize a new demand buffer for a provided downstream subscriber
///
/// - parameter subscriber: The downstream subscriber demanding events
init(subscriber: S) {
self.subscriber = subscriber
}
/// Buffer an upstream value to later be forwarded to
/// the downstream subscriber, once it demands it
///
/// - parameter value: Upstream value to buffer
///
/// - returns: The demand fulfilled by the bufferr
func buffer(value: S.Input) -> Subscribers.Demand {
precondition(self.completion == nil,
"How could a completed publisher sent values?! Beats me 🤷♂️")
switch demandState.requested {
case .unlimited:
return subscriber.receive(value)
default:
buffer.append(value)
return flush()
}
}
/// Complete the demand buffer with an upstream completion event
///
/// This method will deplete the buffer immediately,
/// based on the currently accumulated demand, and relay the
/// completion event down as soon as demand is fulfilled
///
/// - parameter completion: Completion event
func complete(completion: Subscribers.Completion<S.Failure>) {
precondition(self.completion == nil,
"Completion have already occured, which is quite awkward 🥺")
self.completion = completion
_ = flush()
}
/// Signal to the buffer that the downstream requested new demand
///
/// - note: The buffer will attempt to flush as many events requested
/// by the downstream at this point
func demand(_ demand: Subscribers.Demand) -> Subscribers.Demand {
flush(adding: demand)
}
/// Flush buffered events to the downstream based on the current
/// state of the downstream's demand
///
/// - parameter newDemand: The new demand to add. If `nil`, the flush isn't the
/// result of an explicit demand change
///
/// - note: After fulfilling the downstream's request, if completion
/// has already occured, the buffer will be cleared and the
/// completion event will be sent to the downstream subscriber
private func flush(adding newDemand: Subscribers.Demand? = nil) -> Subscribers.Demand {
lock.lock()
defer { lock.unlock() }
if let newDemand = newDemand {
demandState.requested += newDemand
}
// If buffer isn't ready for flushing, return immediately
guard demandState.requested > 0 || newDemand == Subscribers.Demand.none else { return .none }
while !buffer.isEmpty && demandState.processed < demandState.requested {
demandState.requested += subscriber.receive(buffer.remove(at: 0))
demandState.processed += 1
}
if let completion = completion {
// Completion event was already sent
buffer = []
demandState = .init()
self.completion = nil
subscriber.receive(completion: completion)
return .none
}
let sentDemand = remainingDemand
demandState.sent += sentDemand
return sentDemand
}
}
// MARK: - Private Helpers
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
private extension DemandBuffer {
/// A model that tracks the downstream's
/// accumulated demand state
struct Demand {
var processed: Subscribers.Demand = .none
var requested: Subscribers.Demand = .none
var sent: Subscribers.Demand = .none
}
}
// MARK: - Internally-scoped helpers
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Subscription {
/// Reqeust demand if it's not empty
///
/// - parameter demand: Requested demand
func requestIfNeeded(_ demand: Subscribers.Demand) {
guard demand > .none else { return }
request(demand)
}
}
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Optional where Wrapped == Subscription {
/// Cancel the Optional subscription and nullify it
mutating func kill() {
self?.cancel()
self = nil
}
}
#endif