Skip to content

Commit 027838d

Browse files
committed
nio
1 parent 2d93ec3 commit 027838d

File tree

5 files changed

+100
-17
lines changed

5 files changed

+100
-17
lines changed

Diff for: Package.swift

+8-11
Original file line numberDiff line numberDiff line change
@@ -7,26 +7,23 @@ let package = Package(
77
.library(name: "PostgreSQL", targets: ["PostgreSQL"]),
88
],
99
dependencies: [
10-
// ⏱ Promises and reactive-streams in Swift built for high-performance and scalability.
11-
.package(url: "https://github.com/vapor/async.git", from: "1.0.0-rc"),
12-
1310
// 🌎 Utility package containing tools for byte manipulation, Codable, OS APIs, and debugging.
14-
.package(url: "https://github.com/vapor/core.git", from: "3.0.0-rc"),
11+
.package(url: "https://github.com/vapor/core.git", .branch("nio")),
1512

1613
// 🔑 Hashing (BCrypt, SHA, HMAC, etc), encryption, and randomness.
17-
.package(url: "https://github.com/vapor/crypto.git", from: "3.0.0-rc"),
14+
.package(url: "https://github.com/vapor/crypto.git", .branch("nio")),
1815

1916
// 🗄 Core services for creating database integrations.
20-
.package(url: "https://github.com/vapor/database-kit.git", from: "1.0.0-rc"),
17+
.package(url: "https://github.com/vapor/database-kit.git", .branch("nio")),
2118

2219
// 📦 Dependency injection / inversion of control framework.
23-
.package(url: "https://github.com/vapor/service.git", from: "1.0.0-rc"),
24-
25-
// 🔌 Non-blocking TCP socket layer, with event-driven server and client.
26-
.package(url: "https://github.com/vapor/sockets.git", from: "3.0.0-rc"),
20+
.package(url: "https://github.com/vapor/service.git", .branch("nio")),
21+
22+
// Event-driven network application framework for high performance protocol servers & clients, non-blocking.
23+
.package(url: "https://github.com/apple/swift-nio.git", from: "1.0.0"),
2724
],
2825
targets: [
29-
.target(name: "PostgreSQL", dependencies: ["Async", "Bits", "Crypto", "DatabaseKit", "Service", "TCP"]),
26+
.target(name: "PostgreSQL", dependencies: ["Async", "Bits", "Crypto", "DatabaseKit", "NIO", "Service"]),
3027
.testTarget(name: "PostgreSQLTests", dependencies: ["PostgreSQL"]),
3128
]
3229
)
+15-6
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,27 @@
11
import Async
2-
import TCP
2+
import NIO
33

44
extension PostgreSQLConnection {
55
/// Connects to a Redis server using a TCP socket.
66
public static func connect(
77
hostname: String = "localhost",
88
port: UInt16 = 5432,
99
on worker: Worker,
10-
onError: @escaping TCPSocketSink.ErrorHandler
10+
onError: @escaping (Error) -> ()
1111
) throws -> PostgreSQLConnection {
12-
let socket = try TCPSocket(isNonBlocking: true)
13-
let client = try TCPClient(socket: socket)
14-
try client.connect(hostname: hostname, port: port)
15-
let stream = socket.stream(on: worker, onError: onError)
12+
let handler = HTTPClientHandler()
13+
let bootstrap = ClientBootstrap(group: group)
14+
// Enable SO_REUSEADDR.
15+
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
16+
.channelInitializer { channel in
17+
return channel.pipeline.addHTTPClientHandlers().then {
18+
channel.pipeline.add(handler: handler)
19+
}
20+
}
21+
22+
return bootstrap.connect(host: hostname, port: port).map(to: HTTPClient.self) { _ in
23+
return .init(handler: handler, bootstrap: bootstrap)
24+
}
1625
return PostgreSQLConnection(stream: stream, on: worker)
1726
}
1827
}

Diff for: Sources/PostgreSQL/Connection/PostgreSQLConnection.swift

+3
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import Async
22
import Crypto
3+
import NIO
34

45
/// A PostgreSQL frontend client.
56
public final class PostgreSQLConnection {
@@ -11,6 +12,8 @@ public final class PostgreSQLConnection {
1112

1213
/// Creates a new Redis client on the provided data source and sink.
1314
init<Stream>(stream: Stream, on worker: Worker) where Stream: ByteStream {
15+
16+
1417
let queueStream = QueueStream<PostgreSQLMessage, PostgreSQLMessage>()
1518

1619
let serializerStream = PostgreSQLMessageSerializer().stream(on: worker)

Diff for: Sources/PostgreSQL/PostgreSQLMessageDecoder.swift

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
//
2+
// PostgreSQLMessageDecoder.swift
3+
// PostgreSQL
4+
//
5+
// Created by Tanner on 3/2/18.
6+
//
7+
8+
import Foundation

Diff for: Sources/PostgreSQL/QueueHandler.swift

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
import Async
2+
import NIO
3+
4+
struct InputContext<In> {
5+
var promise: Promise<Void>
6+
var onInput: (In) throws -> Bool
7+
}
8+
9+
final class QueueHandler<In, Out>: ChannelInboundHandler {
10+
typealias InboundIn = In
11+
typealias OutboundOut = Out
12+
13+
private var inputQueue: [InputContext<InboundIn>]
14+
private var outputQueue: [OutboundOut]
15+
private let eventLoop: EventLoop
16+
private var waitingCtx: ChannelHandlerContext?
17+
18+
init(on worker: Worker) {
19+
self.inputQueue = []
20+
self.outputQueue = []
21+
}
22+
23+
func enqueue(_ output: [OutboundOut], onInput: @escaping (InboundIn) throws -> Bool) -> Future<Void> {
24+
outputQueue.insert(contentsOf: output.reversed(), at: 0)
25+
let promise = eventLoop.newPromise(Void.self)
26+
let context = InputContext<InboundIn>(promise: promise, onInput: onInput)
27+
inputQueue.insert(context, at: 0)
28+
if let ctx = waitingCtx {
29+
sendOutput(ctx: ctx)
30+
}
31+
return promise.futureResult
32+
}
33+
34+
func channelRead(ctx: ChannelHandlerContext, data: NIOAny) {
35+
let input = unwrapInboundIn(data)
36+
guard let current = inputQueue.last else {
37+
assert(false, "Empty input queue.")
38+
return
39+
}
40+
do {
41+
if try current.onInput(input) {
42+
current.promise.succeed()
43+
assert(inputQueue.popLast() != nil)
44+
}
45+
} catch {
46+
current.promise.fail(error: error)
47+
assert(inputQueue.popLast() != nil)
48+
}
49+
}
50+
51+
func channelActive(ctx: ChannelHandlerContext) {
52+
sendOutput(ctx: ctx)
53+
}
54+
55+
func sendOutput(ctx: ChannelHandlerContext) {
56+
if let next = outputQueue.popLast() {
57+
ctx.write(wrapOutboundOut(next)).do {
58+
self.sendOutput(ctx: ctx)
59+
}.catch { error in
60+
fatalError("\(error)")
61+
}
62+
} else {
63+
waitingCtx = ctx
64+
}
65+
}
66+
}

0 commit comments

Comments
 (0)