Skip to content

Commit 1823a1c

Browse files
authored
Merge pull request vapor#23 from vapor/nio
nio
2 parents 2d93ec3 + 9c2079d commit 1823a1c

39 files changed

+1003
-818
lines changed

Package.swift

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,23 @@ let package = Package(
77
.library(name: "PostgreSQL", targets: ["PostgreSQL"]),
88
],
99
dependencies: [
10-
// ⏱ Promises and reactive-streams in Swift built for high-performance and scalability.
11-
.package(url: "https://github.com/vapor/async.git", from: "1.0.0-rc"),
12-
1310
// 🌎 Utility package containing tools for byte manipulation, Codable, OS APIs, and debugging.
14-
.package(url: "https://github.com/vapor/core.git", from: "3.0.0-rc"),
11+
.package(url: "https://github.com/vapor/core.git", .branch("master")),
1512

1613
// 🔑 Hashing (BCrypt, SHA, HMAC, etc), encryption, and randomness.
17-
.package(url: "https://github.com/vapor/crypto.git", from: "3.0.0-rc"),
14+
.package(url: "https://github.com/vapor/crypto.git", .branch("master")),
1815

1916
// 🗄 Core services for creating database integrations.
20-
.package(url: "https://github.com/vapor/database-kit.git", from: "1.0.0-rc"),
17+
.package(url: "https://github.com/vapor/database-kit.git", .branch("master")),
2118

2219
// 📦 Dependency injection / inversion of control framework.
23-
.package(url: "https://github.com/vapor/service.git", from: "1.0.0-rc"),
24-
25-
// 🔌 Non-blocking TCP socket layer, with event-driven server and client.
26-
.package(url: "https://github.com/vapor/sockets.git", from: "3.0.0-rc"),
20+
.package(url: "https://github.com/vapor/service.git", .branch("master")),
21+
22+
// Event-driven network application framework for high performance protocol servers & clients, non-blocking.
23+
.package(url: "https://github.com/apple/swift-nio.git", from: "1.0.0"),
2724
],
2825
targets: [
29-
.target(name: "PostgreSQL", dependencies: ["Async", "Bits", "Crypto", "DatabaseKit", "Service", "TCP"]),
30-
.testTarget(name: "PostgreSQLTests", dependencies: ["PostgreSQL"]),
26+
.target(name: "PostgreSQL", dependencies: ["Async", "Bits", "Core", "Crypto", "DatabaseKit", "NIO", "Service"]),
27+
.testTarget(name: "PostgreSQLTests", dependencies: ["Core", "PostgreSQL"]),
3128
]
3229
)

Sources/PostgreSQL/Connection/PostgreSQLConnection+Query.swift

Lines changed: 33 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ extension PostgreSQLConnection {
44
/// Sends a parameterized PostgreSQL query command, collecting the parsed results.
55
public func query(
66
_ string: String,
7-
_ parameters: [PostgreSQLDataCustomConvertible] = []
8-
) throws -> Future<[[String: PostgreSQLData]]> {
9-
var rows: [[String: PostgreSQLData]] = []
7+
_ parameters: [PostgreSQLDataConvertible] = []
8+
) throws -> Future<[[PostgreSQLColumn: PostgreSQLData]]> {
9+
var rows: [[PostgreSQLColumn: PostgreSQLData]] = []
1010
return try query(string, parameters) { row in
1111
rows.append(row)
12-
}.map(to: [[String: PostgreSQLData]].self) {
12+
}.map(to: [[PostgreSQLColumn: PostgreSQLData]].self) {
1313
return rows
1414
}
1515
}
@@ -18,59 +18,50 @@ extension PostgreSQLConnection {
1818
/// the supplied closure.
1919
public func query(
2020
_ string: String,
21-
_ parameters: [PostgreSQLDataCustomConvertible] = [],
21+
_ parameters: [PostgreSQLDataConvertible] = [],
2222
resultFormat: PostgreSQLResultFormat = .binary(),
23-
onRow: @escaping ([String: PostgreSQLData]) -> ()
23+
onRow: @escaping ([PostgreSQLColumn: PostgreSQLData]) throws -> ()
2424
) throws -> Future<Void> {
2525
let parameters = try parameters.map { try $0.convertToPostgreSQLData() }
26-
logger?.log(query: string, parameters: parameters)
2726
let parse = PostgreSQLParseRequest(
2827
statementName: "",
2928
query: string,
3029
parameterTypes: parameters.map { $0.type }
3130
)
32-
let describe = PostgreSQLDescribeRequest(type: .statement, name: "")
31+
let describe = PostgreSQLDescribeRequest(
32+
type: .statement,
33+
name: ""
34+
)
35+
let bind = PostgreSQLBindRequest(
36+
portalName: "",
37+
statementName: "",
38+
parameterFormatCodes: parameters.map { $0.format },
39+
parameters: parameters.map { .init(data: $0.data) },
40+
resultFormatCodes: resultFormat.formatCodes
41+
)
42+
let execute = PostgreSQLExecuteRequest(
43+
portalName: "",
44+
maxRows: 0
45+
)
46+
3347
var currentRow: PostgreSQLRowDescription?
34-
35-
return send([
36-
.parse(parse), .describe(describe), .sync
48+
return self.send([
49+
.parse(parse), .describe(describe), .bind(bind), .execute(execute), .sync
3750
]) { message in
3851
switch message {
3952
case .parseComplete: break
40-
case .rowDescription(let row): currentRow = row
4153
case .parameterDescription: break
4254
case .noData: break
43-
default: throw PostgreSQLError(identifier: "query", reason: "Unexpected message during PostgreSQLParseRequest: \(message)", source: .capture())
44-
}
45-
}.flatMap(to: Void.self) {
46-
let resultFormats = resultFormat.formatCodeFactory(currentRow?.fields.map { $0.dataType } ?? [])
47-
// cache so we don't compute twice
48-
let bind = PostgreSQLBindRequest(
49-
portalName: "",
50-
statementName: "",
51-
parameterFormatCodes: parameters.map { $0.format },
52-
parameters: parameters.map { .init(data: $0.data) },
53-
resultFormatCodes: resultFormats
54-
)
55-
let execute = PostgreSQLExecuteRequest(
56-
portalName: "",
57-
maxRows: 0
58-
)
59-
return self.send([
60-
.bind(bind), .execute(execute), .sync
61-
]) { message in
62-
switch message {
63-
case .bindComplete: break
64-
case .dataRow(let data):
65-
guard let row = currentRow else {
66-
throw PostgreSQLError(identifier: "query", reason: "Unexpected PostgreSQLDataRow without preceding PostgreSQLRowDescription.", source: .capture())
67-
}
68-
let parsed = try row.parse(data: data, formatCodes: resultFormats)
69-
onRow(parsed)
70-
case .close: break
71-
case .noData: break
72-
default: throw PostgreSQLError(identifier: "query", reason: "Unexpected message during PostgreSQLParseRequest: \(message)", source: .capture())
55+
case .bindComplete: break
56+
case .rowDescription(let row): currentRow = row
57+
case .dataRow(let data):
58+
guard let row = currentRow else {
59+
throw PostgreSQLError(identifier: "query", reason: "Unexpected PostgreSQLDataRow without preceding PostgreSQLRowDescription.", source: .capture())
7360
}
61+
let parsed = try row.parse(data: data, formatCodes: resultFormat.formatCodes)
62+
try onRow(parsed)
63+
case .close: break
64+
default: throw PostgreSQLError(identifier: "query", reason: "Unexpected message during PostgreSQLParseRequest: \(message)", source: .capture())
7465
}
7566
}
7667
}

Sources/PostgreSQL/Connection/PostgreSQLConnection+SimpleQuery.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,18 @@ import Async
22

33
extension PostgreSQLConnection {
44
/// Sends a simple PostgreSQL query command, collecting the parsed results.
5-
public func simpleQuery(_ string: String) -> Future<[[String: PostgreSQLData]]> {
6-
var rows: [[String: PostgreSQLData]] = []
5+
public func simpleQuery(_ string: String) -> Future<[[PostgreSQLColumn: PostgreSQLData]]> {
6+
var rows: [[PostgreSQLColumn: PostgreSQLData]] = []
77
return simpleQuery(string) { row in
88
rows.append(row)
9-
}.map(to: [[String: PostgreSQLData]].self) {
9+
}.map(to: [[PostgreSQLColumn: PostgreSQLData]].self) {
1010
return rows
1111
}
1212
}
1313

1414
/// Sends a simple PostgreSQL query command, returning the parsed results to
1515
/// the supplied closure.
16-
public func simpleQuery(_ string: String, onRow: @escaping ([String: PostgreSQLData]) -> ()) -> Future<Void> {
16+
public func simpleQuery(_ string: String, onRow: @escaping ([PostgreSQLColumn: PostgreSQLData]) -> ()) -> Future<Void> {
1717
logger?.log(query: string, parameters: [])
1818
var currentRow: PostgreSQLRowDescription?
1919
let query = PostgreSQLQuery(query: string)
Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,43 @@
11
import Async
2-
import TCP
2+
import NIO
33

44
extension PostgreSQLConnection {
55
/// Connects to a Redis server using a TCP socket.
66
public static func connect(
77
hostname: String = "localhost",
8-
port: UInt16 = 5432,
8+
port: Int = 5432,
99
on worker: Worker,
10-
onError: @escaping TCPSocketSink.ErrorHandler
11-
) throws -> PostgreSQLConnection {
12-
let socket = try TCPSocket(isNonBlocking: true)
13-
let client = try TCPClient(socket: socket)
14-
try client.connect(hostname: hostname, port: port)
15-
let stream = socket.stream(on: worker, onError: onError)
16-
return PostgreSQLConnection(stream: stream, on: worker)
10+
onError: @escaping (Error) -> ()
11+
) throws -> Future<PostgreSQLConnection> {
12+
let handler = QueueHandler<PostgreSQLMessage, PostgreSQLMessage>(on: worker, onError: onError)
13+
let bootstrap = ClientBootstrap(group: worker.eventLoop)
14+
// Enable SO_REUSEADDR.
15+
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
16+
.channelInitializer { channel in
17+
return channel.pipeline.addPostgreSQLClientHandlers().then {
18+
channel.pipeline.add(handler: handler)
19+
}
20+
}
21+
22+
return bootstrap.connect(host: hostname, port: port).map(to: PostgreSQLConnection.self) { channel in
23+
return .init(queue: handler, channel: channel)
24+
}
25+
}
26+
}
27+
28+
extension ChannelPipeline {
29+
func addPostgreSQLClientHandlers(first: Bool = false) -> EventLoopFuture<Void> {
30+
return addHandlers(PostgreSQLMessageEncoder(), PostgreSQLMessageDecoder(), first: first)
31+
}
32+
33+
/// Adds the provided channel handlers to the pipeline in the order given, taking account
34+
/// of the behaviour of `ChannelHandler.add(first:)`.
35+
private func addHandlers(_ handlers: ChannelHandler..., first: Bool) -> EventLoopFuture<Void> {
36+
var handlers = handlers
37+
if first {
38+
handlers = handlers.reversed()
39+
}
40+
41+
return EventLoopFuture<Void>.andAll(handlers.map { add(handler: $0) }, eventLoop: eventLoop)
1742
}
1843
}

Sources/PostgreSQL/Connection/PostgreSQLConnection.swift

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,52 @@
11
import Async
22
import Crypto
3+
import NIO
34

45
/// A PostgreSQL frontend client.
56
public final class PostgreSQLConnection {
7+
public var eventLoop: EventLoop {
8+
return channel.eventLoop
9+
}
10+
611
/// Handles enqueued redis commands and responses.
7-
private let queueStream: QueueStream<PostgreSQLMessage, PostgreSQLMessage>
12+
private let queue: QueueHandler<PostgreSQLMessage, PostgreSQLMessage>
13+
14+
/// The channel
15+
private let channel: Channel
816

917
/// If non-nil, will log queries.
1018
public var logger: PostgreSQLLogger?
1119

12-
/// Creates a new Redis client on the provided data source and sink.
13-
init<Stream>(stream: Stream, on worker: Worker) where Stream: ByteStream {
14-
let queueStream = QueueStream<PostgreSQLMessage, PostgreSQLMessage>()
20+
/// Returns a new unique portal name.
21+
internal var nextPortalName: String {
22+
defer { uniqueNameCounter = uniqueNameCounter &+ 1 }
23+
return "p_\(uniqueNameCounter)"
24+
}
1525

16-
let serializerStream = PostgreSQLMessageSerializer().stream(on: worker)
17-
let parserStream = PostgreSQLMessageParser().stream(on: worker)
26+
/// Returns a new unique statement name.
27+
internal var nextStatementName: String {
28+
defer { uniqueNameCounter = uniqueNameCounter &+ 1 }
29+
return "s_\(uniqueNameCounter)"
30+
}
1831

19-
stream.stream(to: parserStream)
20-
.stream(to: queueStream)
21-
.stream(to: serializerStream)
22-
.output(to: stream)
32+
/// A unique identifier for this connection, used to generate statment and portal names
33+
private var uniqueNameCounter: UInt8
2334

24-
self.queueStream = queueStream
35+
/// Creates a new Redis client on the provided data source and sink.
36+
init(queue: QueueHandler<PostgreSQLMessage, PostgreSQLMessage>, channel: Channel) {
37+
self.queue = queue
38+
self.channel = channel
39+
self.uniqueNameCounter = 0
40+
}
41+
42+
deinit {
43+
close()
2544
}
2645

2746
/// Sends `PostgreSQLMessage` to the server.
2847
func send(_ messages: [PostgreSQLMessage], onResponse: @escaping (PostgreSQLMessage) throws -> ()) -> Future<Void> {
2948
var error: Error?
30-
return queueStream.enqueue(messages) { message in
49+
return queue.enqueue(messages) { message in
3150
switch message {
3251
case .readyForQuery:
3352
if let e = error { throw e }
@@ -57,7 +76,7 @@ public final class PostgreSQLConnection {
5776
"database": database ?? username
5877
])
5978
var authRequest: PostgreSQLAuthenticationRequest?
60-
return queueStream.enqueue([.startupMessage(startup)]) { message in
79+
return queue.enqueue([.startupMessage(startup)]) { message in
6180
switch message {
6281
case .authenticationRequest(let a):
6382
authRequest = a
@@ -111,7 +130,7 @@ public final class PostgreSQLConnection {
111130
input = [.password(passwordMessage)]
112131
}
113132

114-
return self.queueStream.enqueue(input) { message in
133+
return self.queue.enqueue(input) { message in
115134
switch message {
116135
case .error(let error): throw error
117136
case .readyForQuery: return true
@@ -125,6 +144,6 @@ public final class PostgreSQLConnection {
125144

126145
/// Closes this client.
127146
public func close() {
128-
queueStream.close()
147+
channel.close(promise: nil)
129148
}
130149
}

Sources/PostgreSQL/Data/PostgreSQLArrayCustomConvertible.swift

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import Foundation
22

33
/// Representable by a `T[]` column on the PostgreSQL database.
4-
public protocol PostgreSQLArrayCustomConvertible: PostgreSQLDataCustomConvertible {
4+
public protocol PostgreSQLArrayCustomConvertible: PostgreSQLDataConvertible {
55
/// The associated array element type
66
associatedtype PostgreSQLArrayElement // : PostgreSQLDataCustomConvertible
77

@@ -127,15 +127,15 @@ extension Array: PostgreSQLArrayCustomConvertible {
127127
}
128128
}
129129

130-
func requirePostgreSQLDataCustomConvertible<T>(_ type: T.Type) -> PostgreSQLDataCustomConvertible.Type {
131-
guard let custom = T.self as? PostgreSQLDataCustomConvertible.Type else {
130+
func requirePostgreSQLDataCustomConvertible<T>(_ type: T.Type) -> PostgreSQLDataConvertible.Type {
131+
guard let custom = T.self as? PostgreSQLDataConvertible.Type else {
132132
fatalError("`\(T.self)` does not conform to `PostgreSQLDataCustomConvertible`")
133133
}
134134
return custom
135135
}
136136

137-
func requirePostgreSQLDataCustomConvertible<T>(_ type: T) -> PostgreSQLDataCustomConvertible {
138-
guard let custom = type as? PostgreSQLDataCustomConvertible else {
137+
func requirePostgreSQLDataCustomConvertible<T>(_ type: T) -> PostgreSQLDataConvertible {
138+
guard let custom = type as? PostgreSQLDataConvertible else {
139139
fatalError("`\(T.self)` does not conform to `PostgreSQLDataCustomConvertible`")
140140
}
141141
return custom
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/// Represents a PostgreSQL column.
2+
public struct PostgreSQLColumn: Hashable, Equatable {
3+
/// The table this column belongs to.
4+
public var tableOID: Int32
5+
6+
/// The column's name.
7+
public var name: String
8+
}
9+
10+
extension PostgreSQLColumn: CustomStringConvertible {
11+
public var description: String {
12+
return "<\(tableOID)>.(\(name))"
13+
}
14+
}
15+
16+
extension Dictionary where Key == PostgreSQLColumn {
17+
/// Accesses the _first_ value from this dictionary with a matching field name.
18+
public func firstValue(forColumn columnName: String) -> Value? {
19+
for (field, value) in self {
20+
if field.name == columnName {
21+
return value
22+
}
23+
}
24+
return nil
25+
}
26+
27+
/// Access a `Value` from this dictionary keyed by `PostgreSQLColumn`s
28+
/// using a field (column) name and entity (table) name.
29+
public func value(forTableOID tableOID: Int32, atColumn column: String) -> Value? {
30+
return self[PostgreSQLColumn(tableOID: tableOID, name: column)]
31+
}
32+
}

Sources/PostgreSQL/Data/PostgreSQLData+BinaryFloatingPoint.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,8 @@ extension BinaryFloatingPoint {
6464
}
6565
}
6666

67-
extension Double: PostgreSQLDataCustomConvertible { }
68-
extension Float: PostgreSQLDataCustomConvertible { }
67+
extension Double: PostgreSQLDataConvertible { }
68+
extension Float: PostgreSQLDataConvertible { }
6969

7070
extension Data {
7171
/// Converts this data to a floating-point number.

Sources/PostgreSQL/Data/PostgreSQLData+Bool.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import Foundation
22

3-
extension Bool: PostgreSQLDataCustomConvertible {
3+
extension Bool: PostgreSQLDataConvertible {
44
/// See `PostgreSQLDataCustomConvertible.postgreSQLDataType`
55
public static var postgreSQLDataType: PostgreSQLDataType { return .bool }
66

Sources/PostgreSQL/Data/PostgreSQLData+Data.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import Foundation
22

3-
extension Data: PostgreSQLDataCustomConvertible {
3+
extension Data: PostgreSQLDataConvertible {
44
/// See `PostgreSQLDataCustomConvertible.postgreSQLDataType`
55
public static var postgreSQLDataType: PostgreSQLDataType { return .bytea }
66

0 commit comments

Comments
 (0)