Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
.DS_Store
/.build
/.index-build
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why have you added this one?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is like a .build directory but for sourcekit-lsp.
In the sourcekit-lsp bundled with swift 6.1 it'll be moved into .build dir itself, but that's not out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to have Swift 6.0.x and have experimental-indexing enabled (e.g. in the VSCode extension or using the sourcekit-lsp config file).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/Packages
/*.xcodeproj
DerivedData
Expand Down
96 changes: 96 additions & 0 deletions Sources/PostgresNIO/Connection/PostgresConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,56 @@ extension PostgresConnection {
}
}

/// Run a query on the Postgres server the connection is connected to, returning the metadata.
///
/// - Parameters:
/// - query: The ``PostgresQuery`` to run
/// - logger: The `Logger` to log into for the query
/// - file: The file, the query was started in. Used for better error reporting.
/// - line: The line, the query was started in. Used for better error reporting.
/// - consume: The closure to consume the ``PostgresRowSequence``.
/// DO NOT escape the row-sequence out of the closure.
/// - Returns: The result of the `consume` closure as well as the query metadata.
public func query<Result>(
_ query: PostgresQuery,
logger: Logger,
file: String = #fileID,
line: Int = #line,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FranzBusch what is the #isolation magic that we need here?

_ consume: (PostgresRowSequence) async throws -> Result
) async throws -> (Result, PostgresQueryMetadata) {
var logger = logger
logger[postgresMetadataKey: .connectionID] = "\(self.id)"

guard query.binds.count <= Int(UInt16.max) else {
throw PSQLError(code: .tooManyParameters, query: query, file: file, line: line)
}
let promise = self.channel.eventLoop.makePromise(of: PSQLRowStream.self)
let context = ExtendedQueryContext(
query: query,
logger: logger,
promise: promise
)

self.channel.write(HandlerTask.extendedQuery(context), promise: nil)

do {
let (rowStream, rowSequence) = try await promise.futureResult.map { rowStream in
(rowStream, rowStream.asyncSequence())
}.get()
let result = try await consume(rowSequence)
try await rowStream.drain().get()
guard let metadata = PostgresQueryMetadata(string: rowStream.commandTag) else {
throw PSQLError.invalidCommandTag(rowStream.commandTag)
}
return (result, metadata)
} catch var error as PSQLError {
error.file = file
error.line = line
error.query = query
throw error // rethrow with more metadata
}
}

/// Start listening for a channel
public func listen(_ channel: String) async throws -> PostgresNotificationSequence {
let id = self.internalListenID.loadThenWrappingIncrement(ordering: .relaxed)
Expand Down Expand Up @@ -531,6 +581,52 @@ extension PostgresConnection {
}
}

/// Execute a statement on the Postgres server the connection is connected to,
/// returning the metadata.
///
/// - Parameters:
/// - query: The ``PostgresQuery`` to run
/// - logger: The `Logger` to log into for the query
/// - file: The file, the query was started in. Used for better error reporting.
/// - line: The line, the query was started in. Used for better error reporting.
/// - Returns: The query metadata.
@discardableResult
public func execute(
_ query: PostgresQuery,
logger: Logger,
file: String = #fileID,
line: Int = #line
) async throws -> PostgresQueryMetadata {
var logger = logger
logger[postgresMetadataKey: .connectionID] = "\(self.id)"

guard query.binds.count <= Int(UInt16.max) else {
throw PSQLError(code: .tooManyParameters, query: query, file: file, line: line)
}
let promise = self.channel.eventLoop.makePromise(of: PSQLRowStream.self)
let context = ExtendedQueryContext(
query: query,
logger: logger,
promise: promise
)

self.channel.write(HandlerTask.extendedQuery(context), promise: nil)

do {
let rowStream = try await promise.futureResult.get()
try await rowStream.drain().get()
guard let metadata = PostgresQueryMetadata(string: rowStream.commandTag) else {
throw PSQLError.invalidCommandTag(rowStream.commandTag)
}
return metadata
} catch var error as PSQLError {
error.file = file
error.line = line
error.query = query
throw error // rethrow with more metadata
}
}

#if compiler(>=6.0)
/// Puts the connection into an open transaction state, for the provided `closure`'s lifetime.
///
Expand Down
65 changes: 64 additions & 1 deletion Sources/PostgresNIO/New/PSQLRowStream.swift
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,70 @@ final class PSQLRowStream: @unchecked Sendable {
return self.eventLoop.makeFailedFuture(error)
}
}


// MARK: Drain on EventLoop

func drain() -> EventLoopFuture<Void> {
if self.eventLoop.inEventLoop {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already implemented with cancel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cancel is not enough:

1- it doesn't wait until the query is over so metadata is available.
If you try to get the metadata too soon, the PostgresNIO code will crash.

2- it doesn't cover all cases that might happen. For example when downstream is waiting for consumer.
That might happen when the user hasn't even tried to consume the rows.

return self.drain0()
} else {
return self.eventLoop.flatSubmit {
self.drain0()
}
}
}

private func drain0() -> EventLoopFuture<Void> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already implemented with cancel0.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See the other comment.

self.eventLoop.preconditionInEventLoop()

switch self.downstreamState {
case .waitingForConsumer(let bufferState):
switch bufferState {
case .streaming(var buffer, let dataSource):
let promise = self.eventLoop.makePromise(of: [PostgresRow].self)

buffer.removeAll()
self.downstreamState = .waitingForAll([], promise, dataSource)
// immediately request more
dataSource.request(for: self)

return promise.futureResult.map { _ in }

case .finished(_, let summary):
self.downstreamState = .consumed(.success(summary))
return self.eventLoop.makeSucceededVoidFuture()

case .failure(let error):
self.downstreamState = .consumed(.failure(error))
return self.eventLoop.makeFailedFuture(error)
}
case .asyncSequence(let consumer, let dataSource, let onFinish):
consumer.finish()
onFinish()

let promise = self.eventLoop.makePromise(of: [PostgresRow].self)

self.downstreamState = .waitingForAll([], promise, dataSource)
// immediately request more
dataSource.request(for: self)

return promise.futureResult.map { _ in }
case .consumed(.success):
// already drained
return self.eventLoop.makeSucceededVoidFuture()
case .consumed(let .failure(error)):
return self.eventLoop.makeFailedFuture(error)
case .waitingForAll(let rows, let promise, let dataSource):
self.downstreamState = .waitingForAll(rows, promise, dataSource)
// immediately request more
dataSource.request(for: self)

return promise.futureResult.map { _ in }
default:
preconditionFailure("Invalid state: \(self.downstreamState)")
}
}

internal func noticeReceived(_ notice: PostgresBackendMessage.NoticeResponse) {
self.logger.debug("Notice Received", metadata: [
.notice: "\(notice)"
Expand Down
2 changes: 2 additions & 0 deletions Sources/PostgresNIO/New/PostgresRowSequence.swift
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ extension PostgresRowSequence {
extension PostgresRowSequence.AsyncIterator: Sendable {}

extension PostgresRowSequence {
/// Collects all rows into an array.
/// - Returns: The rows.
public func collect() async throws -> [PostgresRow] {
var result = [PostgresRow]()
for try await row in self {
Expand Down
55 changes: 55 additions & 0 deletions Sources/PostgresNIO/Pool/PostgresClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,61 @@ public final class PostgresClient: Sendable, ServiceLifecycle.Service {
}
}

/// Run a query on the Postgres server the connection is connected to, returning the metadata.
///
/// - Parameters:
/// - query: The ``PostgresQuery`` to run
/// - logger: The `Logger` to log into for the query
/// - file: The file, the query was started in. Used for better error reporting.
/// - line: The line, the query was started in. Used for better error reporting.
/// - consume: The closure to consume the ``PostgresRowSequence``.
/// DO NOT escape the row-sequence out of the closure.
/// - Returns: The result of the `consume` closure as well as the query metadata.
public func query<Result>(
_ query: PostgresQuery,
logger: Logger? = nil,
file: String = #fileID,
line: Int = #line,
_ consume: (PostgresRowSequence) async throws -> Result
) async throws -> (Result, PostgresQueryMetadata) {
let logger = logger ?? Self.loggingDisabled

do {
guard query.binds.count <= Int(UInt16.max) else {
throw PSQLError(code: .tooManyParameters, query: query, file: file, line: line)
}

let connection = try await self.leaseConnection()

var logger = logger
logger[postgresMetadataKey: .connectionID] = "\(connection.id)"

let promise = connection.channel.eventLoop.makePromise(of: PSQLRowStream.self)
let context = ExtendedQueryContext(
query: query,
logger: logger,
promise: promise
)

connection.channel.write(HandlerTask.extendedQuery(context), promise: nil)

let (rowStream, rowSequence) = try await promise.futureResult.map { rowStream in
(rowStream, rowStream.asyncSequence(onFinish: { self.pool.releaseConnection(connection) }))
}.get()
let result = try await consume(rowSequence)
try await rowStream.drain().get()
guard let metadata = PostgresQueryMetadata(string: rowStream.commandTag) else {
throw PSQLError.invalidCommandTag(rowStream.commandTag)
}
return (result, metadata)
} catch var error as PSQLError {
error.file = file
error.line = line
error.query = query
throw error // rethrow with more metadata
}
}

/// Execute a prepared statement, taking care of the preparation when necessary
public func execute<Statement: PostgresPreparedStatement, Row>(
_ preparedStatement: Statement,
Expand Down
103 changes: 95 additions & 8 deletions Tests/IntegrationTests/AsyncTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,98 @@ final class AsyncPostgresConnectionTests: XCTestCase {
}
}

func testSelect10kRowsWithMetadata() async throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let eventLoop = eventLoopGroup.next()

let start = 1
let end = 10000

try await withTestConnection(on: eventLoop) { connection in
let (result, metadata) = try await connection.query(
"SELECT generate_series(\(start), \(end));",
logger: .psqlTest
) { rows in
var counter = 0
for try await row in rows {
let element = try row.decode(Int.self)
XCTAssertEqual(element, counter + 1)
counter += 1
}
return counter
}

XCTAssertEqual(metadata.command, "SELECT")
XCTAssertEqual(metadata.oid, nil)
XCTAssertEqual(metadata.rows, end)

XCTAssertEqual(result, end)
}
}

func testSelectRowsWithMetadataNotConsumedAtAll() async throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let eventLoop = eventLoopGroup.next()

let start = 1
let end = 10000

try await withTestConnection(on: eventLoop) { connection in
let (_, metadata) = try await connection.query(
"SELECT generate_series(\(start), \(end));",
logger: .psqlTest
) { _ in }

XCTAssertEqual(metadata.command, "SELECT")
XCTAssertEqual(metadata.oid, nil)
XCTAssertEqual(metadata.rows, end)
}
}

func testSelectRowsWithMetadataNotFullyConsumed() async throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let eventLoop = eventLoopGroup.next()

try await withTestConnection(on: eventLoop) { connection in
do {
_ = try await connection.query(
"SELECT generate_series(1, 10000);",
logger: .psqlTest
) { rows in
for try await _ in rows { break }
}
// This path is also fine
} catch is CancellationError {
// Expected
} catch {
XCTFail("Expected 'CancellationError', got: \(String(reflecting: error))")
}
}
}

func testExecuteRowsWithMetadata() async throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
let eventLoop = eventLoopGroup.next()

let start = 1
let end = 10000

try await withTestConnection(on: eventLoop) { connection in
let metadata = try await connection.execute(
"SELECT generate_series(\(start), \(end));",
logger: .psqlTest
)

XCTAssertEqual(metadata.command, "SELECT")
XCTAssertEqual(metadata.oid, nil)
XCTAssertEqual(metadata.rows, end)
}
}

func testSelectActiveConnection() async throws {
let eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1)
defer { XCTAssertNoThrow(try eventLoopGroup.syncShutdownGracefully()) }
Expand Down Expand Up @@ -207,7 +299,7 @@ final class AsyncPostgresConnectionTests: XCTestCase {

try await withTestConnection(on: eventLoop) { connection in
// Max binds limit is UInt16.max which is 65535 which is 3 * 5 * 17 * 257
// Max columns limit is 1664, so we will only make 5 * 257 columns which is less
// Max columns limit appears to be ~1600, so we will only make 5 * 257 columns which is less
// Then we will insert 3 * 17 rows
// In the insertion, there will be a total of 3 * 17 * 5 * 257 == UInt16.max bindings
// If the test is successful, it means Postgres supports UInt16.max bindings
Expand Down Expand Up @@ -241,13 +333,8 @@ final class AsyncPostgresConnectionTests: XCTestCase {
unsafeSQL: "INSERT INTO table1 VALUES \(insertionValues)",
binds: binds
)
try await connection.query(insertionQuery, logger: .psqlTest)

let countQuery = PostgresQuery(unsafeSQL: "SELECT COUNT(*) FROM table1")
let countRows = try await connection.query(countQuery, logger: .psqlTest)
var countIterator = countRows.makeAsyncIterator()
let insertedRowsCount = try await countIterator.next()?.decode(Int.self, context: .default)
XCTAssertEqual(rowsCount, insertedRowsCount)
let metadata = try await connection.execute(insertionQuery, logger: .psqlTest)
XCTAssertEqual(metadata.rows, rowsCount)
Comment on lines +336 to +337
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a test i originally wrote myself in another PR. Moved it to use query metadata since that's what should have been done anyway.


let dropQuery = PostgresQuery(unsafeSQL: "DROP TABLE table1")
try await connection.query(dropQuery, logger: .psqlTest)
Expand Down
Loading
Loading