Skip to content

Commit a9d10f4

Browse files
committed
dbkit 1.0.0 gm; connection pipelining
1 parent a66e6bb commit a9d10f4

File tree

5 files changed

+99
-23
lines changed

5 files changed

+99
-23
lines changed

Sources/PostgreSQL/Connection/PostgreSQLConnection+Query.swift

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,9 @@ extension PostgreSQLConnection {
55
public func query(
66
_ string: String,
77
_ parameters: [PostgreSQLDataConvertible] = []
8-
) throws -> Future<[[PostgreSQLColumn: PostgreSQLData]]> {
8+
) -> Future<[[PostgreSQLColumn: PostgreSQLData]]> {
99
var rows: [[PostgreSQLColumn: PostgreSQLData]] = []
10-
return try query(string, parameters) { row in
10+
return query(string, parameters) { row in
1111
rows.append(row)
1212
}.map(to: [[PostgreSQLColumn: PostgreSQLData]].self) {
1313
return rows
@@ -21,6 +21,22 @@ extension PostgreSQLConnection {
2121
_ parameters: [PostgreSQLDataConvertible] = [],
2222
resultFormat: PostgreSQLResultFormat = .binary(),
2323
onRow: @escaping ([PostgreSQLColumn: PostgreSQLData]) throws -> ()
24+
) -> Future<Void> {
25+
return operation {
26+
do {
27+
return try self._query(string, parameters, resultFormat: resultFormat, onRow: onRow)
28+
} catch {
29+
return self.eventLoop.newFailedFuture(error: error)
30+
}
31+
}
32+
}
33+
34+
/// Non-operation bounded query.
35+
private func _query(
36+
_ string: String,
37+
_ parameters: [PostgreSQLDataConvertible] = [],
38+
resultFormat: PostgreSQLResultFormat = .binary(),
39+
onRow: @escaping ([PostgreSQLColumn: PostgreSQLData]) throws -> ()
2440
) throws -> Future<Void> {
2541
let parameters = try parameters.map { try $0.convertToPostgreSQLData() }
2642
let parse = PostgreSQLParseRequest(

Sources/PostgreSQL/Connection/PostgreSQLConnection+SimpleQuery.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,14 @@ extension PostgreSQLConnection {
1010
return rows
1111
}
1212
}
13-
1413
/// Sends a simple PostgreSQL query command, returning the parsed results to
1514
/// the supplied closure.
1615
public func simpleQuery(_ string: String, onRow: @escaping ([PostgreSQLColumn: PostgreSQLData]) -> ()) -> Future<Void> {
16+
return operation { self._simpleQuery(string, onRow: onRow) }
17+
}
18+
19+
/// Non-operation bounded simple query.
20+
private func _simpleQuery(_ string: String, onRow: @escaping ([PostgreSQLColumn: PostgreSQLData]) -> ()) -> Future<Void> {
1721
logger?.log(query: string, parameters: [])
1822
var currentRow: PostgreSQLRowDescription?
1923
let query = PostgreSQLQuery(query: string)

Sources/PostgreSQL/Connection/PostgreSQLConnection.swift

Lines changed: 73 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
1-
import Async
21
import Crypto
32
import NIO
43

54
/// A PostgreSQL frontend client.
6-
public final class PostgreSQLConnection {
5+
public final class PostgreSQLConnection: DatabaseConnection, BasicWorker {
6+
/// See `BasicWorker`.
77
public var eventLoop: EventLoop {
88
return channel.eventLoop
99
}
@@ -17,6 +17,12 @@ public final class PostgreSQLConnection {
1717
/// If non-nil, will log queries.
1818
public var logger: PostgreSQLLogger?
1919

20+
/// See `DatabaseConnection`.
21+
public var isClosed: Bool
22+
23+
/// See `Extendable`.
24+
public var extend: Extend
25+
2026
/// Returns a new unique portal name.
2127
internal var nextPortalName: String {
2228
defer { uniqueNameCounter = uniqueNameCounter &+ 1 }
@@ -32,21 +38,54 @@ public final class PostgreSQLConnection {
3238
/// A unique identifier for this connection, used to generate statment and portal names
3339
private var uniqueNameCounter: UInt8
3440

41+
/// In-flight `send(...)` futures.
42+
private var currentSend: Promise<Void>?
43+
44+
/// The current query running, if one exists.
45+
private var pipeline: Future<Void>
46+
3547
/// Creates a new Redis client on the provided data source and sink.
3648
init(queue: QueueHandler<PostgreSQLMessage, PostgreSQLMessage>, channel: Channel) {
3749
self.queue = queue
3850
self.channel = channel
3951
self.uniqueNameCounter = 0
52+
self.isClosed = false
53+
self.extend = [:]
54+
self.pipeline = channel.eventLoop.newSucceededFuture(result: ())
55+
channel.closeFuture.always {
56+
self.isClosed = true
57+
if let current = self.currentSend {
58+
current.fail(error: closeError)
59+
}
60+
}
4061
}
41-
42-
deinit {
43-
close()
62+
/// Sends `PostgreSQLMessage` to the server.
63+
func send(_ message: [PostgreSQLMessage]) -> Future<[PostgreSQLMessage]> {
64+
var responses: [PostgreSQLMessage] = []
65+
return send(message) { response in
66+
responses.append(response)
67+
}.map(to: [PostgreSQLMessage].self) {
68+
return responses
69+
}
4470
}
4571

4672
/// Sends `PostgreSQLMessage` to the server.
4773
func send(_ messages: [PostgreSQLMessage], onResponse: @escaping (PostgreSQLMessage) throws -> ()) -> Future<Void> {
74+
// if currentSend is not nil, previous send has not completed
75+
assert(currentSend == nil, "Attempting to call `send(...)` again before previous invocation has completed.")
76+
77+
// ensure the connection is not closed
78+
guard !isClosed else {
79+
return eventLoop.newFailedFuture(error: closeError)
80+
}
81+
82+
// create a new promise and store it
83+
let promise = eventLoop.newPromise(Void.self)
84+
currentSend = promise
85+
86+
// cascade this enqueue to the newly created promise
4887
var error: Error?
49-
return queue.enqueue(messages) { message in
88+
queue.enqueue(messages) { message in
5089
switch message {
5190
case .readyForQuery:
5291
if let e = error { throw e }
@@ -56,17 +95,28 @@ public final class PostgreSQLConnection {
5695
default: try onResponse(message)
5796
}
5897
return false // request until ready for query
59-
}
98+
}.cascade(promise: promise)
99+
100+
// when the promise completes, remove the reference to it
101+
promise.futureResult.always { self.currentSend = nil }
102+
103+
// return the promise's future result (same as `queue.enqueue`)
104+
return promise.futureResult
60105
}
61106

62-
/// Sends `PostgreSQLMessage` to the server.
63-
func send(_ message: [PostgreSQLMessage]) -> Future<[PostgreSQLMessage]> {
64-
var responses: [PostgreSQLMessage] = []
65-
return send(message) { response in
66-
responses.append(response)
67-
}.map(to: [PostgreSQLMessage].self) {
68-
return responses
107+
/// Submits an async task to be pipelined.
108+
internal func operation(_ work: @escaping () -> Future<Void>) -> Future<Void> {
109+
/// perform this work when the current pipeline future is completed
110+
let new = pipeline.then(work)
111+
112+
/// append this work to the pipeline, discarding errors as the pipeline
113+
//// does not care about them
114+
pipeline = new.catchMap { err in
115+
return ()
69116
}
117+
118+
/// return the newly enqueued work's future result
119+
return new
70120
}
71121

72122
/// Authenticates the `PostgreSQLClient` using a username with no password.
@@ -136,6 +186,15 @@ public final class PostgreSQLConnection {
136186

137187
/// Closes this client.
138188
public func close() {
189+
isClosed = true
139190
channel.close(promise: nil)
140191
}
192+
193+
/// Called when this class deinitializes.
194+
deinit {
195+
close()
196+
}
197+
141198
}
199+
200+
private let closeError = PostgreSQLError(identifier: "closed", reason: "Connection is closed.", source: .capture())

Sources/PostgreSQL/Database/PostgreSQLDatabase.swift

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public final class PostgreSQLDatabase: Database {
1414
}
1515

1616
/// See `Database.makeConnection()`
17-
public func makeConnection(on worker: Worker) -> Future<PostgreSQLConnection> {
17+
public func newConnection(on worker: Worker) -> Future<PostgreSQLConnection> {
1818
let config = self.config
1919
return Future.flatMap(on: worker) {
2020
return try PostgreSQLConnection.connect(hostname: config.hostname, port: config.port, on: worker) { error in
@@ -31,9 +31,6 @@ public final class PostgreSQLDatabase: Database {
3131
}
3232
}
3333

34-
/// A connection created by a `PostgreSQLDatabase`.
35-
extension PostgreSQLConnection: DatabaseConnection, BasicWorker { }
36-
3734
extension DatabaseIdentifier {
3835
/// Default identifier for `PostgreSQLDatabase`.
3936
public static var psql: DatabaseIdentifier<PostgreSQLDatabase> {

Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,10 +331,10 @@ class PostgreSQLConnectionTests: XCTestCase {
331331

332332

333333
/// SELECT
334-
let acronyms = try client.query("""
334+
let acronyms = client.query("""
335335
SELECT "acronyms".* FROM "acronyms" WHERE ("acronyms"."id" = $1) LIMIT 1 OFFSET 0
336336
""", [1])
337-
let categories = try client.query("""
337+
let categories = client.query("""
338338
SELECT "categories".* FROM "categories" WHERE ("categories"."id" = $1) LIMIT 1 OFFSET 0
339339
""", [1])
340340

0 commit comments

Comments
 (0)