Skip to content

Commit 97ccb06

Browse files
author
Shaun Hubbard
committed
Add Listen/Notify functionality to PostgreSQL
1 parent 0267e10 commit 97ccb06

File tree

6 files changed

+79
-3
lines changed

6 files changed

+79
-3
lines changed
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import Async
2+
3+
extension PostgreSQLConnection {
4+
public func listen(
5+
_ channelName: String,
6+
handler: @escaping (String) throws -> ()
7+
) throws -> Future<Void> {
8+
beforeClose = { conn in
9+
let query = PostgreSQLQuery(query: "UNLISTEN \(channelName);")
10+
return conn.send([.query(query)], onResponse: { _ in })
11+
}
12+
let query = PostgreSQLQuery(query: "LISTEN \(channelName);")
13+
return queue.enqueue([.query(query)], onInput: { message in
14+
switch message {
15+
case let .notificationResponse(notification):
16+
try handler(notification.message)
17+
return true
18+
default:
19+
return false
20+
}
21+
})
22+
}
23+
24+
public func notify(
25+
_ channelName: String, message: String) throws -> Future<Void> {
26+
let query = PostgreSQLQuery(query: "NOTIFY \(channelName), '\(message)';")
27+
return send([.query(query)]).map(to: Void.self, { _ in })
28+
}
29+
}

Sources/PostgreSQL/Connection/PostgreSQLConnection.swift

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ public final class PostgreSQLConnection {
99
}
1010

1111
/// Handles enqueued redis commands and responses.
12-
private let queue: QueueHandler<PostgreSQLMessage, PostgreSQLMessage>
12+
internal let queue: QueueHandler<PostgreSQLMessage, PostgreSQLMessage>
1313

1414
/// The channel
1515
private let channel: Channel
@@ -134,8 +134,17 @@ public final class PostgreSQLConnection {
134134
}
135135
}
136136

137+
internal var beforeClose: ((PostgreSQLConnection) -> Future<Void>)?
138+
137139
/// Closes this client.
138140
public func close() {
139-
channel.close(promise: nil)
141+
if let beforeClose = beforeClose {
142+
_ = beforeClose(self).then { _ in
143+
self.channel.close(mode: CloseMode.all)
144+
}
145+
} else {
146+
channel.close(promise: nil)
147+
}
148+
140149
}
141150
}

Sources/PostgreSQL/Message+Parse/PostgreSQLMessageDecoder.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ final class PostgreSQLMessageDecoder: ByteToMessageDecoder {
4949
let decoder = _PostgreSQLMessageDecoder(data: messageData)
5050
let message: PostgreSQLMessage
5151
switch messageType {
52+
case .A: message = try .notificationResponse(decoder.decode())
5253
case .E: message = try .error(decoder.decode())
5354
case .N: message = try .notice(decoder.decode())
5455
case .R: message = try .authenticationRequest(decoder.decode())

Sources/PostgreSQL/Message/PostgreSQLMessage.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ enum PostgreSQLMessage {
77
case error(PostgreSQLDiagnosticResponse)
88
/// Identifies the message as a notice.
99
case notice(PostgreSQLDiagnosticResponse)
10+
/// Identifies the message as a notification response.
11+
case notificationResponse(PostgreSQLNotificationResponse)
1012
/// One of the various authentication request message formats.
1113
case authenticationRequest(PostgreSQLAuthenticationRequest)
1214
/// Identifies the message as a password response.
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
import Foundation
2+
3+
struct PostgreSQLNotificationResponse: Decodable {
4+
/// The message coming from PSQL
5+
let message: String
6+
init(from decoder: Decoder) throws {
7+
let container = try decoder.singleValueContainer()
8+
_ = try container.decode(Int32.self) // message length
9+
_ = try container.decode(Int32.self) // process id of message
10+
message = try container.decode(String.self)
11+
}
12+
}

Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,25 @@ class PostgreSQLConnectionTests: XCTestCase {
342342
_ = try categories.wait()
343343
}
344344

345+
func testNotifyAndListen() throws {
346+
let notifyConn = try PostgreSQLConnection.makeTest()
347+
let listenConn = try PostgreSQLConnection.makeTest()
348+
var messageReceived = false
349+
//listenConn
350+
let channelName = "Foo"
351+
let messageText = "Bar"
352+
try listenConn.listen(channelName) { text in
353+
messageReceived = text == messageText
354+
}.catch({ err in XCTFail("error \(err)") })
355+
356+
try notifyConn.notify(channelName, message: messageText).wait()
357+
358+
sleep(1) // Wait for any delay in the message being received
359+
notifyConn.close()
360+
listenConn.close()
361+
XCTAssert(messageReceived)
362+
}
363+
345364
static var allTests = [
346365
("testVersion", testVersion),
347366
("testSelectTypes", testSelectTypes),
@@ -351,6 +370,7 @@ class PostgreSQLConnectionTests: XCTestCase {
351370
("testStruct", testStruct),
352371
("testNull", testNull),
353372
("testGH24", testGH24),
373+
("testNotifyAndListen", testNotifyAndListen)
354374
]
355375
}
356376

@@ -359,7 +379,10 @@ extension PostgreSQLConnection {
359379
static func makeTest() throws -> PostgreSQLConnection {
360380
let hostname: String
361381
#if Xcode
362-
hostname = (try? Process.execute("docker-machine", "ip")) ?? "192.168.99.100"
382+
//hostname = (try? Process.execute("docker-machine", "ip")) ?? "192.168.99.100"
383+
// TODO: Switch this back contrib_bootstrap wasnt working for me and I didnt feel like
384+
// delaying
385+
hostname = "localhost"
363386
#else
364387
hostname = "localhost"
365388
#endif

0 commit comments

Comments
 (0)