Skip to content

Commit 037c0c7

Browse files
authored
Send terminate message on close (vapor#86)
* send postgres terminate message when closing * fix logging
1 parent 3bf4f6e commit 037c0c7

13 files changed

+69
-44
lines changed

Sources/PostgresNIO/Connection/PostgresConnection+Connect.swift

+2-2
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ extension PostgresConnection {
1313
.channelOption(ChannelOptions.socket(SocketOptionLevel(SOL_SOCKET), SO_REUSEADDR), value: 1)
1414
return bootstrap.connect(to: socketAddress).flatMap { channel in
1515
return channel.pipeline.addHandlers([
16-
ByteToMessageHandler(PostgresMessageDecoder()),
17-
MessageToByteHandler(PostgresMessageEncoder()),
16+
ByteToMessageHandler(PostgresMessageDecoder(logger: logger)),
17+
MessageToByteHandler(PostgresMessageEncoder(logger: logger)),
1818
PostgresRequestHandler(logger: logger),
1919
PostgresErrorHandler(logger: logger)
2020
]).map {

Sources/PostgresNIO/Connection/PostgresRequest.swift renamed to Sources/PostgresNIO/Connection/PostgresConnection+Database.swift

+14-18
Original file line numberDiff line numberDiff line change
@@ -12,24 +12,17 @@ extension PostgresConnection: PostgresDatabase {
1212
self.channel.flush()
1313
return promise.futureResult
1414
}
15-
15+
1616
public func withConnection<T>(_ closure: (PostgresConnection) -> EventLoopFuture<T>) -> EventLoopFuture<T> {
1717
closure(self)
1818
}
1919
}
2020

21-
public protocol PostgresRequest {
22-
// return nil to end request
23-
func respond(to message: PostgresMessage) throws -> [PostgresMessage]?
24-
func start() throws -> [PostgresMessage]
25-
func log(to logger: Logger)
26-
}
27-
2821
final class PostgresRequestContext {
2922
let delegate: PostgresRequest
3023
let promise: EventLoopPromise<Void>
3124
var lastError: Error?
32-
25+
3326
init(delegate: PostgresRequest, promise: EventLoopPromise<Void>) {
3427
self.delegate = delegate
3528
self.promise = promise
@@ -40,23 +33,23 @@ final class PostgresRequestHandler: ChannelDuplexHandler {
4033
typealias InboundIn = PostgresMessage
4134
typealias OutboundIn = PostgresRequestContext
4235
typealias OutboundOut = PostgresMessage
43-
36+
4437
private var queue: [PostgresRequestContext]
4538
let logger: Logger
46-
39+
4740
public init(logger: Logger) {
4841
self.queue = []
4942
self.logger = logger
5043
}
51-
44+
5245
private func _channelRead(context: ChannelHandlerContext, data: NIOAny) throws {
5346
let message = self.unwrapInboundIn(data)
5447
guard self.queue.count > 0 else {
5548
// discard packet
5649
return
5750
}
5851
let request = self.queue[0]
59-
52+
6053
switch message.identifier {
6154
case .error:
6255
let error = try PostgresMessage.Error(message: message)
@@ -67,7 +60,7 @@ final class PostgresRequestHandler: ChannelDuplexHandler {
6760
self.logger.notice("\(notice)")
6861
default: break
6962
}
70-
63+
7164
if let responses = try request.delegate.respond(to: message) {
7265
for response in responses {
7366
context.write(self.wrapOutboundOut(response), promise: nil)
@@ -82,7 +75,7 @@ final class PostgresRequestHandler: ChannelDuplexHandler {
8275
}
8376
}
8477
}
85-
78+
8679
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
8780
do {
8881
try self._channelRead(context: context, data: data)
@@ -92,7 +85,7 @@ final class PostgresRequestHandler: ChannelDuplexHandler {
9285
// Regardless of error, also pass the message downstream; this is necessary for PostgresNotificationHandler (which is appended at the end) to receive notifications
9386
context.fireChannelRead(data)
9487
}
95-
88+
9689
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
9790
let request = self.unwrapOutboundIn(data)
9891
self.queue.append(request)
@@ -105,13 +98,16 @@ final class PostgresRequestHandler: ChannelDuplexHandler {
10598
self.errorCaught(context: context, error: error)
10699
}
107100
}
108-
101+
109102
func close(context: ChannelHandlerContext, mode: CloseMode, promise: EventLoopPromise<Void>?) {
103+
let terminate = try! PostgresMessage.Terminate().message()
104+
context.write(self.wrapOutboundOut(terminate), promise: nil)
105+
context.close(mode: mode, promise: promise)
106+
110107
for current in self.queue {
111108
current.promise.fail(PostgresError.connectionClosed)
112109
}
113110
self.queue = []
114-
context.close(mode: mode, promise: promise)
115111
}
116112
}
117113

Sources/PostgresNIO/Connection/PostgresConnection.swift

+5-11
Original file line numberDiff line numberDiff line change
@@ -31,17 +31,11 @@ public final class PostgresConnection {
3131
return self.eventLoop.makeSucceededFuture(())
3232
}
3333
self.didClose = true
34-
35-
let promise = self.eventLoop.makePromise(of: Void.self)
36-
self.eventLoop.submit {
37-
switch self.channel.isActive {
38-
case true:
39-
promise.succeed(())
40-
case false:
41-
self.channel.close(mode: .all, promise: promise)
42-
}
43-
}.cascadeFailure(to: promise)
44-
return promise.futureResult
34+
if !self.isClosed {
35+
return self.channel.close(mode: .all)
36+
} else {
37+
return self.eventLoop.makeSucceededFuture(())
38+
}
4539
}
4640

4741
deinit {

Sources/PostgresNIO/Message/PostgresMessage+BackendKeyData.swift

+5-6
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,11 @@ import NIO
33
extension PostgresMessage {
44
/// Identifies the message as cancellation key data.
55
/// The frontend must save these values if it wishes to be able to issue CancelRequest messages later.
6-
public struct BackendKeyData: CustomStringConvertible {
6+
public struct BackendKeyData: PostgresMessageType {
7+
public static var identifier: PostgresMessage.Identifier {
8+
.backendKeyData
9+
}
10+
711
/// Parses an instance of this message type from a byte buffer.
812
public static func parse(from buffer: inout ByteBuffer) throws -> BackendKeyData {
913
guard let processID = buffer.readInteger(as: Int32.self) else {
@@ -20,10 +24,5 @@ extension PostgresMessage {
2024

2125
/// The secret key of this backend.
2226
public var secretKey: Int32
23-
24-
/// See `CustomStringConvertible`.
25-
public var description: String {
26-
return "processID: \(processID), secretKey: \(secretKey)"
27-
}
2827
}
2928
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
extension PostgresMessage {
2+
public struct Terminate: PostgresMessageType {
3+
public static var identifier: PostgresMessage.Identifier {
4+
.terminate
5+
}
6+
7+
public func serialize(into buffer: inout ByteBuffer) { }
8+
}
9+
}

Sources/PostgresNIO/Message/PostgresMessageDecoder.swift

+6-1
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,14 @@ public final class PostgresMessageDecoder: ByteToMessageDecoder {
99

1010
/// If `true`, the server has asked for authentication.
1111
public var hasSeenFirstMessage: Bool
12+
13+
/// Logger to send debug messages to.
14+
let logger: Logger?
1215

1316
/// Creates a new `PostgresMessageDecoder`.
14-
public init() {
17+
public init(logger: Logger? = nil) {
1518
self.hasSeenFirstMessage = false
19+
self.logger = logger
1620
}
1721

1822
/// See `ByteToMessageDecoder`.
@@ -48,6 +52,7 @@ public final class PostgresMessageDecoder: ByteToMessageDecoder {
4852

4953
// there is sufficient data, use this buffer
5054
buffer = peekBuffer
55+
self.logger?.trace("Decoded: PostgresMessage (\(message.identifier))")
5156
context.fireChannelRead(wrapInboundOut(message))
5257
return .continue
5358
}

Sources/PostgresNIO/Message/PostgresMessageEncoder.swift

+9-2
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,17 @@ import NIO
33
public final class PostgresMessageEncoder: MessageToByteEncoder {
44
/// See `MessageToByteEncoder`.
55
public typealias OutboundIn = PostgresMessage
6+
7+
/// Logger to send debug messages to.
8+
let logger: Logger?
9+
10+
/// Creates a new `PostgresMessageEncoder`.
11+
public init(logger: Logger? = nil) {
12+
self.logger = logger
13+
}
614

715
/// See `MessageToByteEncoder`.
816
public func encode(data message: PostgresMessage, out: inout ByteBuffer) throws {
9-
// print("PostgresMessage.ChannelEncoder.encode(\(data))")
10-
1117
// serialize identifier
1218
var message = message
1319
switch message.identifier {
@@ -25,6 +31,7 @@ public final class PostgresMessageEncoder: MessageToByteEncoder {
2531

2632
// set message size
2733
out.setInteger(Int32(out.writerIndex - messageSizeIndex), at: messageSizeIndex)
34+
self.logger?.trace("Encoded: PostgresMessage (\(message.identifier))")
2835
}
2936
}
3037

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
import Logging
2+
3+
public protocol PostgresRequest {
4+
// return nil to end request
5+
func respond(to message: PostgresMessage) throws -> [PostgresMessage]?
6+
func start() throws -> [PostgresMessage]
7+
func log(to logger: Logger)
8+
}

Tests/PostgresNIOTests/PostgresNIOTests.swift

+11-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ final class PostgresNIOTests: XCTestCase {
1010
}
1111

1212
override func setUp() {
13-
testLogLevel = .info
13+
XCTAssertTrue(isLoggingConfigured)
1414
self.group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
1515
}
1616

@@ -1164,3 +1164,13 @@ private func prepareTableToMeasureSelectPerformance(
11641164
_ = try conn.query(insertQuery, batchedFixtureData).wait()
11651165
}
11661166
}
1167+
1168+
1169+
let isLoggingConfigured: Bool = {
1170+
LoggingSystem.bootstrap { label in
1171+
var handler = StreamLogHandler.standardOutput(label: label)
1172+
handler.logLevel = .debug
1173+
return handler
1174+
}
1175+
return true
1176+
}()

Tests/PostgresNIOTests/Utilities.swift

-3
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
import Logging
22
import PostgresNIO
33

4-
var testLogLevel: Logger.Level = .info
5-
64
extension PostgresConnection {
75
static func address() throws -> SocketAddress {
86
#if os(Linux)
@@ -27,7 +25,6 @@ extension PostgresConnection {
2725
database: "vapor_database",
2826
password: "vapor_password"
2927
).map {
30-
conn.logger.logLevel = testLogLevel
3128
return conn
3229
}
3330
}

0 commit comments

Comments
 (0)