Skip to content

Commit 95e5b6f

Browse files
authoredApr 25, 2018
Merge branch 'notify-listen' into dbkit-gm
2 parents f8092e3 + b4ca67f commit 95e5b6f

File tree

6 files changed

+86
-4
lines changed

6 files changed

+86
-4
lines changed
 
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
import Async
2+
3+
extension PostgreSQLConnection {
4+
public func listen(
5+
_ channelName: String,
6+
handler: @escaping (String) throws -> ()
7+
) throws -> Future<Void> {
8+
beforeClose = { conn in
9+
let query = PostgreSQLQuery(query: "UNLISTEN \"\(channelName)\";")
10+
return conn.send([.query(query)], onResponse: { _ in })
11+
}
12+
let query = PostgreSQLQuery(query: "LISTEN \"\(channelName)\";")
13+
return queue.enqueue([.query(query)], onInput: { message in
14+
switch message {
15+
case let .notificationResponse(notification):
16+
try handler(notification.message)
17+
default:
18+
break
19+
}
20+
return false
21+
})
22+
}
23+
24+
public func notify(
25+
_ channelName: String, message: String) throws -> Future<Void> {
26+
let query = PostgreSQLQuery(query: "NOTIFY \"\(channelName)\", '\(message)';")
27+
return send([.query(query)]).map(to: Void.self, { _ in })
28+
}
29+
}

‎Sources/PostgreSQL/Connection/PostgreSQLConnection.swift

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ public final class PostgreSQLConnection: DatabaseConnection, BasicWorker {
99
}
1010

1111
/// Handles enqueued redis commands and responses.
12-
private let queue: QueueHandler<PostgreSQLMessage, PostgreSQLMessage>
12+
internal let queue: QueueHandler<PostgreSQLMessage, PostgreSQLMessage>
1313

1414
/// The channel
1515
private let channel: Channel
@@ -184,10 +184,17 @@ public final class PostgreSQLConnection: DatabaseConnection, BasicWorker {
184184
}
185185
}
186186

187+
internal var beforeClose: ((PostgreSQLConnection) -> Future<Void>)?
188+
187189
/// Closes this client.
188190
public func close() {
189-
isClosed = true
190-
channel.close(promise: nil)
191+
if let beforeClose = beforeClose {
192+
_ = beforeClose(self).then { _ in
193+
self.channel.close(mode: CloseMode.all)
194+
}
195+
} else {
196+
channel.close(promise: nil)
197+
}
191198
}
192199

193200
/// Called when this class deinitializes.

‎Sources/PostgreSQL/Message+Parse/PostgreSQLMessageDecoder.swift

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ final class PostgreSQLMessageDecoder: ByteToMessageDecoder {
4949
let decoder = _PostgreSQLMessageDecoder(data: messageData)
5050
let message: PostgreSQLMessage
5151
switch messageType {
52+
case .A: message = try .notificationResponse(decoder.decode())
5253
case .E: message = try .error(decoder.decode())
5354
case .N: message = try .notice(decoder.decode())
5455
case .R: message = try .authenticationRequest(decoder.decode())

‎Sources/PostgreSQL/Message/PostgreSQLMessage.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ enum PostgreSQLMessage {
77
case error(PostgreSQLDiagnosticResponse)
88
/// Identifies the message as a notice.
99
case notice(PostgreSQLDiagnosticResponse)
10+
/// Identifies the message as a notification response.
11+
case notificationResponse(PostgreSQLNotificationResponse)
1012
/// One of the various authentication request message formats.
1113
case authenticationRequest(PostgreSQLAuthenticationRequest)
1214
/// Identifies the message as a password response.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import Foundation
2+
3+
struct PostgreSQLNotificationResponse: Decodable {
4+
/// The message coming from PSQL
5+
let message: String
6+
init(from decoder: Decoder) throws {
7+
let container = try decoder.singleValueContainer()
8+
_ = try container.decode(Int32.self) // message length
9+
_ = try container.decode(Int32.self) // process id of message
10+
let channelId = try container.decode(String.self)
11+
let message = try? container.decode(String.self)
12+
self.message = message ?? channelId
13+
}
14+
}

‎Tests/PostgreSQLTests/PostgreSQLConnectionTests.swift

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import PostgreSQL
55
import Core
66

77
class PostgreSQLConnectionTests: XCTestCase {
8+
let defaultTimeout = 5.0
89
func testVersion() throws {
910
let client = try PostgreSQLConnection.makeTest()
1011
let results = try client.simpleQuery("SELECT version();").wait()
@@ -342,6 +343,32 @@ class PostgreSQLConnectionTests: XCTestCase {
342343
_ = try categories.wait()
343344
}
344345

346+
func testNotifyAndListen() throws {
347+
let completionHandlerExpectation1 = expectation(description: "first completion handler called")
348+
let completionHandlerExpectation2 = expectation(description: "final completion handler called")
349+
let notifyConn = try PostgreSQLConnection.makeTest()
350+
let listenConn = try PostgreSQLConnection.makeTest()
351+
let channelName = "Foo"
352+
let messageText = "Bar"
353+
let finalMessageText = "Baz"
354+
355+
try listenConn.listen(channelName) { text in
356+
if text == messageText {
357+
completionHandlerExpectation1.fulfill()
358+
} else if text == finalMessageText {
359+
completionHandlerExpectation2.fulfill()
360+
}
361+
}.catch({ err in XCTFail("error \(err)") })
362+
363+
try notifyConn.notify(channelName, message: messageText).wait()
364+
try notifyConn.notify(channelName, message: finalMessageText).wait()
365+
366+
notifyConn.close()
367+
listenConn.close()
368+
waitForExpectations(timeout: defaultTimeout)
369+
}
370+
371+
345372
func testURLParsing() throws {
346373
let databaseURL = "postgres://username:password@hostname.com:5432/database"
347374
let config = try PostgreSQLDatabaseConfig(url: databaseURL)
@@ -361,6 +388,7 @@ class PostgreSQLConnectionTests: XCTestCase {
361388
("testStruct", testStruct),
362389
("testNull", testNull),
363390
("testGH24", testGH24),
391+
("testNotifyAndListen", testNotifyAndListen),
364392
("testURLParsing", testURLParsing),
365393
]
366394
}
@@ -370,7 +398,8 @@ extension PostgreSQLConnection {
370398
static func makeTest() throws -> PostgreSQLConnection {
371399
let hostname: String
372400
#if Xcode
373-
hostname = (try? Process.execute("docker-machine", "ip")) ?? "192.168.99.100"
401+
//hostname = (try? Process.execute("docker-machine", "ip")) ?? "192.168.99.100"
402+
hostname = "localhost"
374403
#else
375404
hostname = "localhost"
376405
#endif

0 commit comments

Comments
 (0)
Please sign in to comment.