1
- import Async
2
1
import Crypto
3
2
import NIO
4
3
5
4
/// A PostgreSQL frontend client.
6
- public final class PostgreSQLConnection {
5
+ public final class PostgreSQLConnection : DatabaseConnection , BasicWorker {
6
+ /// See `BasicWorker`.
7
7
public var eventLoop : EventLoop {
8
8
return channel. eventLoop
9
9
}
10
10
11
11
/// Handles enqueued redis commands and responses.
12
- private let queue : QueueHandler < PostgreSQLMessage , PostgreSQLMessage >
12
+ internal let queue : QueueHandler < PostgreSQLMessage , PostgreSQLMessage >
13
13
14
14
/// The channel
15
15
private let channel : Channel
16
16
17
17
/// If non-nil, will log queries.
18
- public var logger : PostgreSQLLogger ?
18
+ public var logger : DatabaseLogger ?
19
+
20
+ /// See `DatabaseConnection`.
21
+ public var isClosed : Bool
22
+
23
+ /// See `Extendable`.
24
+ public var extend : Extend
19
25
20
26
/// Returns a new unique portal name.
21
27
internal var nextPortalName : String {
@@ -32,21 +38,63 @@ public final class PostgreSQLConnection {
32
38
/// A unique identifier for this connection, used to generate statment and portal names
33
39
private var uniqueNameCounter : UInt8
34
40
41
+ /// In-flight `send(...)` futures.
42
+ private var currentSend : Promise < Void > ?
43
+
44
+ /// The current query running, if one exists.
45
+ private var pipeline : Future < Void >
46
+
47
+ /// Block type to be called on close of connection
48
+ internal typealias CloseHandler = ( ( PostgreSQLConnection ) -> Future < Void > )
49
+ /// Called on close of the connection
50
+ internal var closeHandlers = [ CloseHandler] ( )
51
+ /// Handler type for Notifications
52
+ internal typealias NotificationHandler = ( String ) throws -> Void
53
+ /// Handlers to be stored by channel name
54
+ internal var notificationHandlers : [ String : NotificationHandler ] = [ : ]
55
+
35
56
/// Creates a new Redis client on the provided data source and sink.
36
57
init ( queue: QueueHandler < PostgreSQLMessage , PostgreSQLMessage > , channel: Channel ) {
37
58
self . queue = queue
38
59
self . channel = channel
39
60
self . uniqueNameCounter = 0
61
+ self . isClosed = false
62
+ self . extend = [ : ]
63
+ self . pipeline = channel. eventLoop. newSucceededFuture ( result: ( ) )
64
+ channel. closeFuture. always {
65
+ self . isClosed = true
66
+ if let current = self . currentSend {
67
+ current. fail ( error: closeError)
68
+ }
69
+ }
40
70
}
41
-
42
- deinit {
43
- close ( )
71
+ /// Sends `PostgreSQLMessage` to the server.
72
+ func send( _ message: [ PostgreSQLMessage ] ) -> Future < [ PostgreSQLMessage ] > {
73
+ var responses : [ PostgreSQLMessage ] = [ ]
74
+ return send ( message) { response in
75
+ responses. append ( response)
76
+ } . map ( to: [ PostgreSQLMessage ] . self) {
77
+ return responses
78
+ }
44
79
}
45
80
46
81
/// Sends `PostgreSQLMessage` to the server.
47
82
func send( _ messages: [ PostgreSQLMessage ] , onResponse: @escaping ( PostgreSQLMessage ) throws -> ( ) ) -> Future < Void > {
83
+ // if currentSend is not nil, previous send has not completed
84
+ assert ( currentSend == nil , " Attempting to call `send(...)` again before previous invocation has completed. " )
85
+
86
+ // ensure the connection is not closed
87
+ guard !isClosed else {
88
+ return eventLoop. newFailedFuture ( error: closeError)
89
+ }
90
+
91
+ // create a new promise and store it
92
+ let promise = eventLoop. newPromise ( Void . self)
93
+ currentSend = promise
94
+
95
+ // cascade this enqueue to the newly created promise
48
96
var error : Error ?
49
- return queue. enqueue ( messages) { message in
97
+ queue. enqueue ( messages) { message in
50
98
switch message {
51
99
case . readyForQuery:
52
100
if let e = error { throw e }
@@ -56,17 +104,28 @@ public final class PostgreSQLConnection {
56
104
default : try onResponse ( message)
57
105
}
58
106
return false // request until ready for query
59
- }
107
+ } . cascade ( promise: promise)
108
+
109
+ // when the promise completes, remove the reference to it
110
+ promise. futureResult. always { self . currentSend = nil }
111
+
112
+ // return the promise's future result (same as `queue.enqueue`)
113
+ return promise. futureResult
60
114
}
61
115
62
- /// Sends `PostgreSQLMessage` to the server.
63
- func send( _ message: [ PostgreSQLMessage ] ) -> Future < [ PostgreSQLMessage ] > {
64
- var responses : [ PostgreSQLMessage ] = [ ]
65
- return send ( message) { response in
66
- responses. append ( response)
67
- } . map ( to: [ PostgreSQLMessage ] . self) {
68
- return responses
116
+ /// Submits an async task to be pipelined.
117
+ internal func operation( _ work: @escaping ( ) -> Future < Void > ) -> Future < Void > {
118
+ /// perform this work when the current pipeline future is completed
119
+ let new = pipeline. then ( work)
120
+
121
+ /// append this work to the pipeline, discarding errors as the pipeline
122
+ //// does not care about them
123
+ pipeline = new. catchMap { err in
124
+ return ( )
69
125
}
126
+
127
+ /// return the newly enqueued work's future result
128
+ return new
70
129
}
71
130
72
131
/// Authenticates the `PostgreSQLClient` using a username with no password.
@@ -134,8 +193,29 @@ public final class PostgreSQLConnection {
134
193
}
135
194
}
136
195
196
+
137
197
/// Closes this client.
138
198
public func close( ) {
139
- channel . close ( promise : nil )
199
+ _ = executeCloseHandlersThenClose ( )
140
200
}
201
+
202
+
203
+ private func executeCloseHandlersThenClose( ) -> Future < Void > {
204
+ if let beforeClose = closeHandlers. popLast ( ) {
205
+ return beforeClose ( self ) . then { _ in
206
+ self . executeCloseHandlersThenClose ( )
207
+ }
208
+ } else {
209
+ return channel. close ( mode: . all)
210
+ }
211
+ }
212
+
213
+
214
+ /// Called when this class deinitializes.
215
+ deinit {
216
+ close ( )
217
+ }
218
+
141
219
}
220
+
221
+ private let closeError = PostgreSQLError ( identifier: " closed " , reason: " Connection is closed. " , source: . capture( ) )
0 commit comments