forked from vapor/postgres-nio
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathPostgresConnection+Notifications.swift
62 lines (55 loc) · 3.22 KB
/
PostgresConnection+Notifications.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import NIO
import Logging
/// Context for receiving NotificationResponse messages on a connection, used for PostgreSQL's `LISTEN`/`NOTIFY` support.
public final class PostgresListenContext {
var stopper: (() -> Void)?
/// Detach this listener so it no longer receives notifications. Other listeners, including those for the same channel, are unaffected. `UNLISTEN` is not sent; you are responsible for issuing an `UNLISTEN` query yourself if it is appropriate for your application.
public func stop() {
stopper?()
stopper = nil
}
}
extension PostgresConnection {
/// Add a handler for NotificationResponse messages on a certain channel. This is used in conjunction with PostgreSQL's `LISTEN`/`NOTIFY` support: to listen on a channel, you add a listener using this method to handle the NotificationResponse messages, then issue a `LISTEN` query to instruct PostgreSQL to begin sending NotificationResponse messages.
@discardableResult
public func addListener(channel: String, handler notificationHandler: @escaping (PostgresListenContext, PostgresMessage.NotificationResponse) -> Void) -> PostgresListenContext {
let listenContext = PostgresListenContext()
let channelHandler = PostgresNotificationHandler(logger: self.logger, channel: channel, notificationHandler: notificationHandler, listenContext: listenContext)
let pipeline = self.channel.pipeline
_ = pipeline.addHandler(channelHandler, name: nil, position: .last)
listenContext.stopper = { [pipeline, unowned channelHandler] in
_ = pipeline.removeHandler(channelHandler)
}
return listenContext
}
}
final class PostgresNotificationHandler: ChannelInboundHandler, RemovableChannelHandler {
typealias InboundIn = PostgresMessage
typealias InboundOut = PostgresMessage
let logger: Logger
let channel: String
let notificationHandler: (PostgresListenContext, PostgresMessage.NotificationResponse) -> Void
let listenContext: PostgresListenContext
init(logger: Logger, channel: String, notificationHandler: @escaping (PostgresListenContext, PostgresMessage.NotificationResponse) -> Void, listenContext: PostgresListenContext) {
self.logger = logger
self.channel = channel
self.notificationHandler = notificationHandler
self.listenContext = listenContext
}
func channelRead(context: ChannelHandlerContext, data: NIOAny) {
let request = self.unwrapInboundIn(data)
// Slightly complicated: We need to dispatch downstream _before_ we handle the notification ourselves, because the notification handler could try to stop the listen, which removes ourselves from the pipeline and makes fireChannelRead not work any more.
context.fireChannelRead(self.wrapInboundOut(request))
if request.identifier == .notificationResponse {
do {
var data = request.data
let notification = try PostgresMessage.NotificationResponse.parse(from: &data)
if notification.channel == channel {
self.notificationHandler(self.listenContext, notification)
}
} catch let error {
self.logger.error("\(error)")
}
}
}
}