Skip to content

Commit fa640ea

Browse files
committed
nio encoder/decoder
1 parent 027838d commit fa640ea

15 files changed

+651
-669
lines changed

Sources/PostgreSQL/Connection/PostgreSQLConnection+TCP.swift

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,39 @@ extension PostgreSQLConnection {
55
/// Connects to a Redis server using a TCP socket.
66
public static func connect(
77
hostname: String = "localhost",
8-
port: UInt16 = 5432,
8+
port: Int = 5432,
99
on worker: Worker,
1010
onError: @escaping (Error) -> ()
11-
) throws -> PostgreSQLConnection {
12-
let handler = HTTPClientHandler()
13-
let bootstrap = ClientBootstrap(group: group)
11+
) throws -> Future<PostgreSQLConnection> {
12+
let handler = QueueHandler<PostgreSQLMessage, PostgreSQLMessage>(on: worker)
13+
let bootstrap = ClientBootstrap(group: worker.eventLoop)
1414
// Enable SO_REUSEADDR.
1515
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
1616
.channelInitializer { channel in
17-
return channel.pipeline.addHTTPClientHandlers().then {
17+
return channel.pipeline.addPostgreSQLClientHandlers().then {
1818
channel.pipeline.add(handler: handler)
1919
}
2020
}
2121

22-
return bootstrap.connect(host: hostname, port: port).map(to: HTTPClient.self) { _ in
23-
return .init(handler: handler, bootstrap: bootstrap)
22+
return bootstrap.connect(host: hostname, port: port).map(to: PostgreSQLConnection.self) { channel in
23+
return .init(queue: handler, channel: channel)
2424
}
25-
return PostgreSQLConnection(stream: stream, on: worker)
25+
}
26+
}
27+
28+
extension ChannelPipeline {
29+
func addPostgreSQLClientHandlers(first: Bool = false) -> EventLoopFuture<Void> {
30+
return addHandlers(PostgreSQLMessageEncoder(), PostgreSQLMessageDecoder(), first: first)
31+
}
32+
33+
/// Adds the provided channel handlers to the pipeline in the order given, taking account
34+
/// of the behaviour of `ChannelHandler.add(first:)`.
35+
private func addHandlers(_ handlers: ChannelHandler..., first: Bool) -> EventLoopFuture<Void> {
36+
var handlers = handlers
37+
if first {
38+
handlers = handlers.reversed()
39+
}
40+
41+
return EventLoopFuture<Void>.andAll(handlers.map { add(handler: $0) }, eventLoop: eventLoop)
2642
}
2743
}

Sources/PostgreSQL/Connection/PostgreSQLConnection.swift

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,32 +5,24 @@ import NIO
55
/// A PostgreSQL frontend client.
66
public final class PostgreSQLConnection {
77
/// Handles enqueued redis commands and responses.
8-
private let queueStream: QueueStream<PostgreSQLMessage, PostgreSQLMessage>
8+
private let queue: QueueHandler<PostgreSQLMessage, PostgreSQLMessage>
9+
10+
/// The channel
11+
private let channel: Channel
912

1013
/// If non-nil, will log queries.
1114
public var logger: PostgreSQLLogger?
1215

1316
/// Creates a new Redis client on the provided data source and sink.
14-
init<Stream>(stream: Stream, on worker: Worker) where Stream: ByteStream {
15-
16-
17-
let queueStream = QueueStream<PostgreSQLMessage, PostgreSQLMessage>()
18-
19-
let serializerStream = PostgreSQLMessageSerializer().stream(on: worker)
20-
let parserStream = PostgreSQLMessageParser().stream(on: worker)
21-
22-
stream.stream(to: parserStream)
23-
.stream(to: queueStream)
24-
.stream(to: serializerStream)
25-
.output(to: stream)
26-
27-
self.queueStream = queueStream
17+
init(queue: QueueHandler<PostgreSQLMessage, PostgreSQLMessage>, channel: Channel) {
18+
self.queue = queue
19+
self.channel = channel
2820
}
2921

3022
/// Sends `PostgreSQLMessage` to the server.
3123
func send(_ messages: [PostgreSQLMessage], onResponse: @escaping (PostgreSQLMessage) throws -> ()) -> Future<Void> {
3224
var error: Error?
33-
return queueStream.enqueue(messages) { message in
25+
return queue.enqueue(messages) { message in
3426
switch message {
3527
case .readyForQuery:
3628
if let e = error { throw e }
@@ -60,7 +52,7 @@ public final class PostgreSQLConnection {
6052
"database": database ?? username
6153
])
6254
var authRequest: PostgreSQLAuthenticationRequest?
63-
return queueStream.enqueue([.startupMessage(startup)]) { message in
55+
return queue.enqueue([.startupMessage(startup)]) { message in
6456
switch message {
6557
case .authenticationRequest(let a):
6658
authRequest = a
@@ -114,7 +106,7 @@ public final class PostgreSQLConnection {
114106
input = [.password(passwordMessage)]
115107
}
116108

117-
return self.queueStream.enqueue(input) { message in
109+
return self.queue.enqueue(input) { message in
118110
switch message {
119111
case .error(let error): throw error
120112
case .readyForQuery: return true
@@ -128,6 +120,6 @@ public final class PostgreSQLConnection {
128120

129121
/// Closes this client.
130122
public func close() {
131-
queueStream.close()
123+
channel.close(promise: nil)
132124
}
133125
}

Sources/PostgreSQL/Database/PostgreSQLDatabase.swift

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,18 +15,18 @@ public final class PostgreSQLDatabase: Database {
1515

1616
/// See `Database.makeConnection()`
1717
public func makeConnection(on worker: Worker) -> Future<PostgreSQLConnection> {
18-
do {
19-
let client = try PostgreSQLConnection.connect(hostname: config.hostname, port: config.port, on: worker) { _, error in
18+
let config = self.config
19+
return Future.flatMap(on: worker) {
20+
return try PostgreSQLConnection.connect(hostname: config.hostname, port: config.port, on: worker) { error in
2021
print("[PostgreSQL] \(error)")
22+
}.flatMap(to: PostgreSQLConnection.self) { client in
23+
client.logger = self.logger
24+
return client.authenticate(
25+
username: config.username,
26+
database: config.database,
27+
password: config.password
28+
).transform(to: client)
2129
}
22-
client.logger = logger
23-
return client.authenticate(
24-
username: config.username,
25-
database: config.database,
26-
password: config.password
27-
).transform(to: client)
28-
} catch {
29-
return Future(error: error)
3030
}
3131
}
3232
}

Sources/PostgreSQL/Database/PostgreSQLDatabaseConfig.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ public struct PostgreSQLDatabaseConfig {
99
public let hostname: String
1010

1111
/// Destination port.
12-
public let port: UInt16
12+
public let port: Int
1313

1414
/// Username to authenticate.
1515
public let username: String
@@ -22,7 +22,7 @@ public struct PostgreSQLDatabaseConfig {
2222
public let password: String?
2323

2424
/// Creates a new `PostgreSQLDatabaseConfig`.
25-
public init(hostname: String, port: UInt16, username: String, database: String? = nil, password: String? = nil) {
25+
public init(hostname: String, port: Int, username: String, database: String? = nil, password: String? = nil) {
2626
self.hostname = hostname
2727
self.port = port
2828
self.username = username

0 commit comments

Comments
 (0)