Skip to content

Commit 43e3a77

Browse files
alex-xnortanner0101
alex-xnor
authored andcommitted
Add support for receiving and dispatching NotificationResponse messages (vapor#60)
* Add support for receiving and dispatching NotificationResponse messages * Conform to a less powerful protocol * Handle notifications by inserting channel handler * Appropriately filter based on channel * Swap notification & context parameters in callback * Put PostgresNotificationHandlers at end * Explicit "self." * Log parse failures rather than passing down the chain * Documentation for PostgresListenContext * Rename listen to addListener * Add documentation for addListener
1 parent 37694b6 commit 43e3a77

7 files changed

+207
-1
lines changed

Sources/PostgresNIO/Connection/PostgresClient+Query.swift

+2
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ private final class PostgresParameterizedQuery: PostgresRequest {
8080
return []
8181
case .notice:
8282
return []
83+
case .notificationResponse:
84+
return []
8385
case .readyForQuery:
8486
return nil
8587
default: throw PostgresError.protocol("Unexpected message during query: \(message)")

Sources/PostgresNIO/Connection/PostgresClient+SimpleQuery.swift

+2
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ private final class PostgresSimpleQuery: PostgresRequest {
5454
return nil
5555
case .notice:
5656
return []
57+
case .notificationResponse:
58+
return []
5759
default:
5860
throw PostgresError.protocol("Unexpected message during simple query: \(message)")
5961
}

Sources/PostgresNIO/Connection/PostgresConnection+Connect.swift

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ extension PostgresConnection {
2020
]).map {
2121
return PostgresConnection(channel: channel, logger: logger)
2222
}
23-
}.flatMap { conn in
23+
}.flatMap { (conn: PostgresConnection) in
2424
if let tlsConfiguration = tlsConfiguration {
2525
return conn.requestTLS(
2626
using: tlsConfiguration,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import NIO
2+
import Logging
3+
4+
/// Context for receiving NotificationResponse messages on a connection, used for PostgreSQL's `LISTEN`/`NOTIFY` support.
5+
public final class PostgresListenContext {
6+
var stopper: (() -> Void)?
7+
8+
/// Detach this listener so it no longer receives notifications. Other listeners, including those for the same channel, are unaffected. `UNLISTEN` is not sent; you are responsible for issuing an `UNLISTEN` query yourself if it is appropriate for your application.
9+
public func stop() {
10+
stopper?()
11+
stopper = nil
12+
}
13+
}
14+
15+
extension PostgresConnection {
16+
/// Add a handler for NotificationResponse messages on a certain channel. This is used in conjunction with PostgreSQL's `LISTEN`/`NOTIFY` support: to listen on a channel, you add a listener using this method to handle the NotificationResponse messages, then issue a `LISTEN` query to instruct PostgreSQL to begin sending NotificationResponse messages.
17+
@discardableResult
18+
public func addListener(channel: String, handler notificationHandler: @escaping (PostgresListenContext, PostgresMessage.NotificationResponse) -> Void) -> PostgresListenContext {
19+
let listenContext = PostgresListenContext()
20+
let channelHandler = PostgresNotificationHandler(logger: self.logger, channel: channel, notificationHandler: notificationHandler, listenContext: listenContext)
21+
let pipeline = self.channel.pipeline
22+
_ = pipeline.addHandler(channelHandler, name: nil, position: .last)
23+
listenContext.stopper = { [pipeline, unowned channelHandler] in
24+
_ = pipeline.removeHandler(channelHandler)
25+
}
26+
return listenContext
27+
}
28+
}
29+
30+
final class PostgresNotificationHandler: ChannelInboundHandler, RemovableChannelHandler {
31+
typealias InboundIn = PostgresMessage
32+
typealias InboundOut = PostgresMessage
33+
34+
let logger: Logger
35+
let channel: String
36+
let notificationHandler: (PostgresListenContext, PostgresMessage.NotificationResponse) -> Void
37+
let listenContext: PostgresListenContext
38+
39+
init(logger: Logger, channel: String, notificationHandler: @escaping (PostgresListenContext, PostgresMessage.NotificationResponse) -> Void, listenContext: PostgresListenContext) {
40+
self.logger = logger
41+
self.channel = channel
42+
self.notificationHandler = notificationHandler
43+
self.listenContext = listenContext
44+
}
45+
46+
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
47+
let request = self.unwrapInboundIn(data)
48+
// Slightly complicated: We need to dispatch downstream _before_ we handle the notification ourselves, because the notification handler could try to stop the listen, which removes ourselves from the pipeline and makes fireChannelRead not work any more.
49+
context.fireChannelRead(self.wrapInboundOut(request))
50+
if request.identifier == .notificationResponse {
51+
do {
52+
var data = request.data
53+
let notification = try PostgresMessage.NotificationResponse.parse(from: &data)
54+
if notification.channel == channel {
55+
self.notificationHandler(self.listenContext, notification)
56+
}
57+
} catch let error {
58+
self.logger.error("\(error)")
59+
}
60+
}
61+
}
62+
}

Sources/PostgresNIO/Connection/PostgresRequest.swift

+2
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ final class PostgresRequestHandler: ChannelDuplexHandler {
8989
} catch {
9090
self.errorCaught(context: context, error: error)
9191
}
92+
// Regardless of error, also pass the message downstream; this is necessary for PostgresNotificationHandler (which is appended at the end) to receive notifications
93+
context.fireChannelRead(data)
9294
}
9395

9496
func write(context: ChannelHandlerContext, data: NIOAny, promise: EventLoopPromise<Void>?) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
import NIO
2+
3+
extension PostgresMessage {
4+
/// Identifies the message as a notification response.
5+
public struct NotificationResponse: PostgresMessageType {
6+
public static let identifier = Identifier.notificationResponse
7+
8+
/// Parses an instance of this message type from a byte buffer.
9+
public static func parse(from buffer: inout ByteBuffer) throws -> Self {
10+
guard let backendPID: Int32 = buffer.readInteger() else {
11+
throw PostgresError.protocol("Invalid NotificationResponse message: unable to read backend PID")
12+
}
13+
guard let channel = buffer.readNullTerminatedString() else {
14+
throw PostgresError.protocol("Invalid NotificationResponse message: unable to read channel")
15+
}
16+
guard let payload = buffer.readNullTerminatedString() else {
17+
throw PostgresError.protocol("Invalid NotificationResponse message: unable to read payload")
18+
}
19+
return .init(backendPID: backendPID, channel: channel, payload: payload)
20+
}
21+
22+
public var backendPID: Int32
23+
public var channel: String
24+
public var payload: String
25+
}
26+
}

Tests/PostgresNIOTests/PostgresNIOTests.swift

+112
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,118 @@ final class PostgresNIOTests: XCTestCase {
6464
}
6565
}
6666

67+
func testNotificationsEmptyPayload() throws {
68+
let conn = try PostgresConnection.test(on: eventLoop).wait()
69+
defer { try! conn.close().wait() }
70+
var receivedNotifications: [PostgresMessage.NotificationResponse] = []
71+
conn.addListener(channel: "example") { context, notification in
72+
receivedNotifications.append(notification)
73+
}
74+
_ = try conn.simpleQuery("LISTEN example").wait()
75+
_ = try conn.simpleQuery("NOTIFY example").wait()
76+
// Notifications are asynchronous, so we should run at least one more query to make sure we'll have received the notification response by then
77+
_ = try conn.simpleQuery("SELECT 1").wait()
78+
XCTAssertEqual(receivedNotifications.count, 1)
79+
XCTAssertEqual(receivedNotifications[0].channel, "example")
80+
XCTAssertEqual(receivedNotifications[0].payload, "")
81+
}
82+
83+
func testNotificationsNonEmptyPayload() throws {
84+
let conn = try PostgresConnection.test(on: eventLoop).wait()
85+
defer { try! conn.close().wait() }
86+
var receivedNotifications: [PostgresMessage.NotificationResponse] = []
87+
conn.addListener(channel: "example") { context, notification in
88+
receivedNotifications.append(notification)
89+
}
90+
_ = try conn.simpleQuery("LISTEN example").wait()
91+
_ = try conn.simpleQuery("NOTIFY example, 'Notification payload example'").wait()
92+
// Notifications are asynchronous, so we should run at least one more query to make sure we'll have received the notification response by then
93+
_ = try conn.simpleQuery("SELECT 1").wait()
94+
XCTAssertEqual(receivedNotifications.count, 1)
95+
XCTAssertEqual(receivedNotifications[0].channel, "example")
96+
XCTAssertEqual(receivedNotifications[0].payload, "Notification payload example")
97+
}
98+
99+
func testNotificationsRemoveHandlerWithinHandler() throws {
100+
let conn = try PostgresConnection.test(on: eventLoop).wait()
101+
defer { try! conn.close().wait() }
102+
var receivedNotifications = 0
103+
conn.addListener(channel: "example") { context, notification in
104+
receivedNotifications += 1
105+
context.stop()
106+
}
107+
_ = try conn.simpleQuery("LISTEN example").wait()
108+
_ = try conn.simpleQuery("NOTIFY example").wait()
109+
_ = try conn.simpleQuery("NOTIFY example").wait()
110+
_ = try conn.simpleQuery("SELECT 1").wait()
111+
XCTAssertEqual(receivedNotifications, 1)
112+
}
113+
114+
func testNotificationsRemoveHandlerOutsideHandler() throws {
115+
let conn = try PostgresConnection.test(on: eventLoop).wait()
116+
defer { try! conn.close().wait() }
117+
var receivedNotifications = 0
118+
let context = conn.addListener(channel: "example") { context, notification in
119+
receivedNotifications += 1
120+
}
121+
_ = try conn.simpleQuery("LISTEN example").wait()
122+
_ = try conn.simpleQuery("NOTIFY example").wait()
123+
_ = try conn.simpleQuery("SELECT 1").wait()
124+
context.stop()
125+
_ = try conn.simpleQuery("NOTIFY example").wait()
126+
_ = try conn.simpleQuery("SELECT 1").wait()
127+
XCTAssertEqual(receivedNotifications, 1)
128+
}
129+
130+
func testNotificationsMultipleRegisteredHandlers() throws {
131+
let conn = try PostgresConnection.test(on: eventLoop).wait()
132+
defer { try! conn.close().wait() }
133+
var receivedNotifications1 = 0
134+
conn.addListener(channel: "example") { context, notification in
135+
receivedNotifications1 += 1
136+
}
137+
var receivedNotifications2 = 0
138+
conn.addListener(channel: "example") { context, notification in
139+
receivedNotifications2 += 1
140+
}
141+
_ = try conn.simpleQuery("LISTEN example").wait()
142+
_ = try conn.simpleQuery("NOTIFY example").wait()
143+
_ = try conn.simpleQuery("SELECT 1").wait()
144+
XCTAssertEqual(receivedNotifications1, 1)
145+
XCTAssertEqual(receivedNotifications2, 1)
146+
}
147+
148+
func testNotificationsMultipleRegisteredHandlersRemoval() throws {
149+
let conn = try PostgresConnection.test(on: eventLoop).wait()
150+
defer { try! conn.close().wait() }
151+
var receivedNotifications1 = 0
152+
conn.addListener(channel: "example") { context, notification in
153+
receivedNotifications1 += 1
154+
context.stop()
155+
}
156+
var receivedNotifications2 = 0
157+
conn.addListener(channel: "example") { context, notification in
158+
receivedNotifications2 += 1
159+
}
160+
_ = try conn.simpleQuery("LISTEN example").wait()
161+
_ = try conn.simpleQuery("NOTIFY example").wait()
162+
_ = try conn.simpleQuery("NOTIFY example").wait()
163+
_ = try conn.simpleQuery("SELECT 1").wait()
164+
XCTAssertEqual(receivedNotifications1, 1)
165+
XCTAssertEqual(receivedNotifications2, 2)
166+
}
167+
168+
func testNotificationHandlerFiltersOnChannel() throws {
169+
let conn = try PostgresConnection.test(on: eventLoop).wait()
170+
defer { try! conn.close().wait() }
171+
conn.addListener(channel: "desired") { context, notification in
172+
XCTFail("Received notification on channel that handler was not registered for")
173+
}
174+
_ = try conn.simpleQuery("LISTEN undesired").wait()
175+
_ = try conn.simpleQuery("NOTIFY undesired").wait()
176+
_ = try conn.simpleQuery("SELECT 1").wait()
177+
}
178+
67179
func testSelectTypes() throws {
68180
let conn = try PostgresConnection.test(on: eventLoop).wait()
69181
defer { try! conn.close().wait() }

0 commit comments

Comments
 (0)