Skip to content

Commit 5875335

Browse files
marius-seMordil
authored andcommitted
Add onUnexpectedConnectionClose callback to pool
Backports `onUnexpectedConnectionClose` on the pool to 1.x
1 parent 3bd5940 commit 5875335

File tree

3 files changed

+83
-75
lines changed

3 files changed

+83
-75
lines changed

Sources/RediStack/Configuration.swift

+16-12
Original file line numberDiff line numberDiff line change
@@ -27,20 +27,20 @@ extension RedisConnection.Configuration {
2727
var localizedDescription: String { self.kind.localizedDescription }
2828

2929
private let kind: Kind
30-
30+
3131
private init(_ kind: Kind) { self.kind = kind }
32-
32+
3333
public static func ==(lhs: ValidationError, rhs: ValidationError) -> Bool {
3434
return lhs.kind == rhs.kind
3535
}
36-
36+
3737
private enum Kind: LocalizedError {
3838
case invalidURLString
3939
case missingURLScheme
4040
case invalidURLScheme
4141
case missingURLHost
4242
case outOfBoundsDatabaseID
43-
43+
4444
var localizedDescription: String {
4545
let message: String = {
4646
switch self {
@@ -66,7 +66,7 @@ extension RedisConnection {
6666
///
6767
/// See [https://redis.io/topics/quickstart](https://redis.io/topics/quickstart)
6868
public static var defaultPort = 6379
69-
69+
7070
internal static let defaultLogger = Logger.redisBaseConnectionLogger
7171

7272
/// The hostname of the connection address. If the address is a Unix socket, then it will be `nil`.
@@ -85,9 +85,9 @@ extension RedisConnection {
8585
public let initialDatabase: Int?
8686
/// The logger prototype that will be used by the connection by default when generating logs.
8787
public let defaultLogger: Logger
88-
88+
8989
internal let address: SocketAddress
90-
90+
9191
/// Creates a new connection configuration with the provided details.
9292
/// - Parameters:
9393
/// - address: The socket address information to use for creating the Redis connection.
@@ -106,7 +106,7 @@ extension RedisConnection {
106106
if initialDatabase != nil && initialDatabase! < 0 {
107107
throw ValidationError.outOfBoundsDatabaseID
108108
}
109-
109+
110110
self.address = address
111111
self.password = password
112112
self.initialDatabase = initialDatabase
@@ -182,9 +182,9 @@ extension RedisConnection {
182182
try Self.validateRedisURL(url)
183183

184184
guard let host = url.host, !host.isEmpty else { throw ValidationError.missingURLHost }
185-
185+
186186
let databaseID = Int(url.lastPathComponent)
187-
187+
188188
try self.init(
189189
address: try .makeAddressResolvingHost(host, port: url.port ?? Self.defaultPort),
190190
password: url.password,
@@ -219,7 +219,7 @@ extension RedisConnectionPool {
219219
public let connectionInitialDatabase: Int?
220220
/// The pre-configured TCP client for connections to use.
221221
public let tcpClient: ClientBootstrap?
222-
222+
223223
/// Creates a new connection factory configuration with the provided options.
224224
/// - Parameters:
225225
/// - connectionInitialDatabase: The optional database index to initially connect to. The default is `nil`.
@@ -255,11 +255,13 @@ extension RedisConnectionPool {
255255
public let maximumConnectionCount: RedisConnectionPoolSize
256256
/// The configuration object that controls the connection retry behavior.
257257
public let connectionRetryConfiguration: (backoff: (initialDelay: TimeAmount, factor: Float32), timeout: TimeAmount)
258+
/// Called when a connection in the pool is closed unexpectedly.
259+
public let onUnexpectedConnectionClose: ((RedisConnection) -> Void)?
258260
// these need to be var so they can be updated by the pool in some cases
259261
public internal(set) var factoryConfiguration: ConnectionFactoryConfiguration
260262
/// The logger prototype that will be used by the connection pool by default when generating logs.
261263
public internal(set) var poolDefaultLogger: Logger
262-
264+
263265
/// Creates a new connection configuration with the provided options.
264266
/// - Parameters:
265267
/// - initialServerConnectionAddresses: The set of Redis servers to which this pool is initially willing to connect.
@@ -284,6 +286,7 @@ extension RedisConnectionPool {
284286
connectionBackoffFactor: Float32 = 2,
285287
initialConnectionBackoffDelay: TimeAmount = .milliseconds(100),
286288
connectionRetryTimeout: TimeAmount? = .seconds(60),
289+
onUnexpectedConnectionClose: ((RedisConnection) -> Void)? = nil,
287290
poolDefaultLogger: Logger? = nil
288291
) {
289292
self.initialConnectionAddresses = initialServerConnectionAddresses
@@ -294,6 +297,7 @@ extension RedisConnectionPool {
294297
(initialConnectionBackoffDelay, connectionBackoffFactor),
295298
connectionRetryTimeout ?? .milliseconds(10) // always default to a baseline 10ms
296299
)
300+
self.onUnexpectedConnectionClose = onUnexpectedConnectionClose
297301
self.poolDefaultLogger = poolDefaultLogger ?? .redisBaseConnectionPoolLogger
298302
}
299303
}

Sources/RediStack/RedisConnection.swift

+39-39
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import NIO
2121
import NIOConcurrencyHelpers
2222

2323
extension RedisConnection {
24-
24+
2525
/// Creates a new connection with provided configuration and sychronization objects.
2626
///
2727
/// If you would like to specialize the `NIO.ClientBootstrap` that the connection communicates on, override the default by passing it in as `configuredTCPClient`.
@@ -55,7 +55,7 @@ extension RedisConnection {
5555
configuredTCPClient client: ClientBootstrap? = nil
5656
) -> EventLoopFuture<RedisConnection> {
5757
let client = client ?? .makeRedisTCPClient(group: eventLoop)
58-
58+
5959
var future = client
6060
.connect(to: config.address)
6161
.map { return RedisConnection(configuredRESPChannel: $0, context: config.defaultLogger) }
@@ -73,7 +73,7 @@ extension RedisConnection {
7373
return connection.select(database: database).map { connection }
7474
}
7575
}
76-
76+
7777
return future
7878
}
7979
}
@@ -157,14 +157,14 @@ public final class RedisConnection: RedisClient, RedisClientWithUserContext {
157157
get { return _stateLock.withLock { self._state } }
158158
set(newValue) { _stateLock.withLockVoid { self._state = newValue } }
159159
}
160-
160+
161161
deinit {
162162
if isConnected {
163163
assertionFailure("close() was not called before deinit!")
164164
self.logger.warning("connection was not properly shutdown before deinit")
165165
}
166166
}
167-
167+
168168
internal init(configuredRESPChannel: Channel, context: Context) {
169169
self.channel = configuredRESPChannel
170170
// there is a mix of verbiage here as the API is forward thinking towards "baggage context"
@@ -176,14 +176,14 @@ public final class RedisConnection: RedisClient, RedisClientWithUserContext {
176176

177177
RedisMetrics.activeConnectionCount.increment()
178178
RedisMetrics.totalConnectionCount.increment()
179-
179+
180180
// attach a callback to the channel to capture situations where the channel might be closed out from under
181181
// the connection
182182
self.channel.closeFuture.whenSuccess {
183183
// if our state is still open, that means we didn't cause the closeFuture to resolve.
184184
// update state, metrics, and logging
185185
guard self.state.isConnected else { return }
186-
186+
187187
self.state = .closed
188188
self.logger.error("connection was closed unexpectedly")
189189
RedisMetrics.activeConnectionCount.decrement()
@@ -192,13 +192,13 @@ public final class RedisConnection: RedisClient, RedisClientWithUserContext {
192192

193193
self.logger.trace("connection created")
194194
}
195-
195+
196196
internal enum ConnectionState {
197197
case open
198198
case pubsub(RedisPubSubHandler)
199199
case shuttingDown
200200
case closed
201-
201+
202202
var isConnected: Bool {
203203
switch self {
204204
case .open, .pubsub: return true
@@ -242,40 +242,40 @@ extension RedisConnection {
242242
return self.channel.eventLoop.makeFailedFuture(error)
243243
}
244244
logger.trace("received command request")
245-
245+
246246
logger.debug("sending command", metadata: [
247247
RedisLogging.MetadataKeys.commandKeyword: "\(command)",
248248
RedisLogging.MetadataKeys.commandArguments: "\(arguments)"
249249
])
250-
250+
251251
var message: [RESPValue] = [.init(bulk: command)]
252252
message.append(contentsOf: arguments)
253-
253+
254254
let promise = channel.eventLoop.makePromise(of: RESPValue.self)
255255
let command = RedisCommand(
256256
message: .array(message),
257257
responsePromise: promise
258258
)
259-
259+
260260
let startTime = DispatchTime.now().uptimeNanoseconds
261261
promise.futureResult.whenComplete { result in
262262
let duration = DispatchTime.now().uptimeNanoseconds - startTime
263263
RedisMetrics.commandRoundTripTime.recordNanoseconds(duration)
264-
264+
265265
// log data based on the result
266266
switch result {
267267
case let .failure(error):
268268
logger.error("command failed", metadata: [
269269
RedisLogging.MetadataKeys.error: "\(error.localizedDescription)"
270270
])
271-
271+
272272
case let .success(value):
273273
logger.debug("command succeeded", metadata: [
274274
RedisLogging.MetadataKeys.commandResult: "\(value)"
275275
])
276276
}
277277
}
278-
278+
279279
defer { logger.trace("command sent") }
280280

281281
if self.sendCommandsImmediately {
@@ -310,10 +310,10 @@ extension RedisConnection {
310310

311311
// we're now in a shutdown state, starting with the command queue.
312312
self.state = .shuttingDown
313-
313+
314314
let notification = self.sendQuitCommand(logger: logger) // send "QUIT" so that all the responses are written out
315315
.flatMap { self.closeChannel() } // close the channel from our end
316-
316+
317317
notification.whenFailure {
318318
logger.error("error while closing connection", metadata: [
319319
RedisLogging.MetadataKeys.error: "\($0)"
@@ -324,10 +324,10 @@ extension RedisConnection {
324324
logger.trace("connection is now closed")
325325
RedisMetrics.activeConnectionCount.decrement()
326326
}
327-
327+
328328
return notification
329329
}
330-
330+
331331
/// Bypasses everything for a normal command and explicitly just sends a "QUIT" command to Redis.
332332
/// - Note: If the command fails, the `NIO.EventLoopFuture` will still succeed - as it's not critical for the command to succeed.
333333
private func sendQuitCommand(logger: Logger) -> EventLoopFuture<Void> {
@@ -344,22 +344,22 @@ extension RedisConnection {
344344
.map { _ in logger.trace("sent QUIT command") } // ignore the result's value
345345
.recover { _ in logger.debug("recovered from error sending QUIT") } // if there's an error, just return to void
346346
}
347-
347+
348348
/// Attempts to close the `NIO.Channel`.
349349
/// SwiftNIO throws a `NIO.EventLoopError.shutdown` if the channel is already closed,
350350
/// so that case is captured to let this method's `NIO.EventLoopFuture` still succeed.
351351
private func closeChannel() -> EventLoopFuture<Void> {
352352
let promise = self.channel.eventLoop.makePromise(of: Void.self)
353-
353+
354354
self.channel.close(promise: promise)
355-
355+
356356
// if we succeed, great, if not - check the error that happened
357357
return promise.futureResult
358358
.flatMapError { error in
359359
guard let e = error as? EventLoopError else {
360360
return self.eventLoop.makeFailedFuture(error)
361361
}
362-
362+
363363
// if the error is that the channel is already closed, great - just succeed.
364364
// otherwise, fail the chain
365365
switch e {
@@ -395,7 +395,7 @@ extension RedisConnection {
395395
) -> EventLoopFuture<Void> {
396396
return self._subscribe(.channels(channels), receiver, subscribeHandler, unsubscribeHandler, nil)
397397
}
398-
398+
399399
public func psubscribe(
400400
to patterns: [String],
401401
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
@@ -404,7 +404,7 @@ extension RedisConnection {
404404
) -> EventLoopFuture<Void> {
405405
return self._subscribe(.patterns(patterns), receiver, subscribeHandler, unsubscribeHandler, nil)
406406
}
407-
407+
408408
internal func subscribe(
409409
to channels: [RedisChannelName],
410410
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
@@ -414,7 +414,7 @@ extension RedisConnection {
414414
) -> EventLoopFuture<Void> {
415415
return self._subscribe(.channels(channels), receiver, subscribeHandler, unsubscribeHandler, context)
416416
}
417-
417+
418418
internal func psubscribe(
419419
to patterns: [String],
420420
messageReceiver receiver: @escaping RedisSubscriptionMessageReceiver,
@@ -424,7 +424,7 @@ extension RedisConnection {
424424
) -> EventLoopFuture<Void> {
425425
return self._subscribe(.patterns(patterns), receiver, subscribeHandler, unsubscribeHandler, context)
426426
}
427-
427+
428428
private func _subscribe(
429429
_ target: RedisSubscriptionTarget,
430430
_ receiver: @escaping RedisSubscriptionMessageReceiver,
@@ -433,9 +433,9 @@ extension RedisConnection {
433433
_ logger: Logger?
434434
) -> EventLoopFuture<Void> {
435435
let logger = self.prepareLoggerForUse(logger)
436-
436+
437437
logger.trace("received subscribe request")
438-
438+
439439
// if we're closed, just error out
440440
guard self.state.isConnected else { return self.eventLoop.makeFailedFuture(RedisClientError.connectionClosed) }
441441

@@ -483,7 +483,7 @@ extension RedisConnection {
483483
logger.debug("the connection is now in pubsub mode")
484484
}
485485
}
486-
486+
487487
// add the subscription and just ignore the subscription count
488488
return handler
489489
.addSubscription(for: target, messageReceiver: receiver, onSubscribe: onSubscribe, onUnsubscribe: onUnsubscribe)
@@ -497,27 +497,27 @@ extension RedisConnection {
497497
public func unsubscribe(from channels: [RedisChannelName]) -> EventLoopFuture<Void> {
498498
return self._unsubscribe(.channels(channels), nil)
499499
}
500-
500+
501501
public func punsubscribe(from patterns: [String]) -> EventLoopFuture<Void> {
502502
return self._unsubscribe(.patterns(patterns), nil)
503503
}
504-
504+
505505
internal func unsubscribe(from channels: [RedisChannelName], context: Context?) -> EventLoopFuture<Void> {
506506
return self._unsubscribe(.channels(channels), context)
507507
}
508-
508+
509509
internal func punsubscribe(from patterns: [String], context: Context?) -> EventLoopFuture<Void> {
510510
return self._unsubscribe(.patterns(patterns), context)
511511
}
512-
512+
513513
private func _unsubscribe(_ target: RedisSubscriptionTarget, _ logger: Logger?) -> EventLoopFuture<Void> {
514514
let logger = self.prepareLoggerForUse(logger)
515-
515+
516516
logger.trace("received unsubscribe request")
517517

518518
// if we're closed, just error out
519519
guard self.state.isConnected else { return self.eventLoop.makeFailedFuture(RedisClientError.connectionClosed) }
520-
520+
521521
// if we're not in pubsub mode, then we just succeed as a no-op
522522
guard case let .pubsub(handler) = self.state else {
523523
// but we still assert just to give some notification to devs at debug
@@ -526,11 +526,11 @@ extension RedisConnection {
526526
])
527527
return self.eventLoop.makeSucceededFuture(())
528528
}
529-
529+
530530
logger.trace("removing subscription", metadata: [
531531
RedisLogging.MetadataKeys.pubsubTarget: "\(target.debugDescription)"
532532
])
533-
533+
534534
// remove the subscription
535535
return handler.removeSubscription(for: target)
536536
.flatMap {

0 commit comments

Comments
 (0)