1
- import Async
2
1
import Crypto
3
2
import NIO
4
3
5
4
/// A PostgreSQL frontend client.
6
- public final class PostgreSQLConnection {
5
+ public final class PostgreSQLConnection : DatabaseConnection , BasicWorker {
6
+ /// See `BasicWorker`.
7
7
public var eventLoop : EventLoop {
8
8
return channel. eventLoop
9
9
}
@@ -15,7 +15,13 @@ public final class PostgreSQLConnection {
15
15
private let channel : Channel
16
16
17
17
/// If non-nil, will log queries.
18
- public var logger : PostgreSQLLogger ?
18
+ public var logger : DatabaseLogger ?
19
+
20
+ /// See `DatabaseConnection`.
21
+ public var isClosed : Bool
22
+
23
+ /// See `Extendable`.
24
+ public var extend : Extend
19
25
20
26
/// Returns a new unique portal name.
21
27
internal var nextPortalName : String {
@@ -32,21 +38,54 @@ public final class PostgreSQLConnection {
32
38
/// A unique identifier for this connection, used to generate statment and portal names
33
39
private var uniqueNameCounter : UInt8
34
40
41
+ /// In-flight `send(...)` futures.
42
+ private var currentSend : Promise < Void > ?
43
+
44
+ /// The current query running, if one exists.
45
+ private var pipeline : Future < Void >
46
+
35
47
/// Creates a new Redis client on the provided data source and sink.
36
48
init ( queue: QueueHandler < PostgreSQLMessage , PostgreSQLMessage > , channel: Channel ) {
37
49
self . queue = queue
38
50
self . channel = channel
39
51
self . uniqueNameCounter = 0
52
+ self . isClosed = false
53
+ self . extend = [ : ]
54
+ self . pipeline = channel. eventLoop. newSucceededFuture ( result: ( ) )
55
+ channel. closeFuture. always {
56
+ self . isClosed = true
57
+ if let current = self . currentSend {
58
+ current. fail ( error: closeError)
59
+ }
60
+ }
40
61
}
41
-
42
- deinit {
43
- close ( )
62
+ /// Sends `PostgreSQLMessage` to the server.
63
+ func send( _ message: [ PostgreSQLMessage ] ) -> Future < [ PostgreSQLMessage ] > {
64
+ var responses : [ PostgreSQLMessage ] = [ ]
65
+ return send ( message) { response in
66
+ responses. append ( response)
67
+ } . map ( to: [ PostgreSQLMessage ] . self) {
68
+ return responses
69
+ }
44
70
}
45
71
46
72
/// Sends `PostgreSQLMessage` to the server.
47
73
func send( _ messages: [ PostgreSQLMessage ] , onResponse: @escaping ( PostgreSQLMessage ) throws -> ( ) ) -> Future < Void > {
74
+ // if currentSend is not nil, previous send has not completed
75
+ assert ( currentSend == nil , " Attempting to call `send(...)` again before previous invocation has completed. " )
76
+
77
+ // ensure the connection is not closed
78
+ guard !isClosed else {
79
+ return eventLoop. newFailedFuture ( error: closeError)
80
+ }
81
+
82
+ // create a new promise and store it
83
+ let promise = eventLoop. newPromise ( Void . self)
84
+ currentSend = promise
85
+
86
+ // cascade this enqueue to the newly created promise
48
87
var error : Error ?
49
- return queue. enqueue ( messages) { message in
88
+ queue. enqueue ( messages) { message in
50
89
switch message {
51
90
case . readyForQuery:
52
91
if let e = error { throw e }
@@ -56,17 +95,28 @@ public final class PostgreSQLConnection {
56
95
default : try onResponse ( message)
57
96
}
58
97
return false // request until ready for query
59
- }
98
+ } . cascade ( promise: promise)
99
+
100
+ // when the promise completes, remove the reference to it
101
+ promise. futureResult. always { self . currentSend = nil }
102
+
103
+ // return the promise's future result (same as `queue.enqueue`)
104
+ return promise. futureResult
60
105
}
61
106
62
- /// Sends `PostgreSQLMessage` to the server.
63
- func send( _ message: [ PostgreSQLMessage ] ) -> Future < [ PostgreSQLMessage ] > {
64
- var responses : [ PostgreSQLMessage ] = [ ]
65
- return send ( message) { response in
66
- responses. append ( response)
67
- } . map ( to: [ PostgreSQLMessage ] . self) {
68
- return responses
107
+ /// Submits an async task to be pipelined.
108
+ internal func operation( _ work: @escaping ( ) -> Future < Void > ) -> Future < Void > {
109
+ /// perform this work when the current pipeline future is completed
110
+ let new = pipeline. then ( work)
111
+
112
+ /// append this work to the pipeline, discarding errors as the pipeline
113
+ //// does not care about them
114
+ pipeline = new. catchMap { err in
115
+ return ( )
69
116
}
117
+
118
+ /// return the newly enqueued work's future result
119
+ return new
70
120
}
71
121
72
122
/// Authenticates the `PostgreSQLClient` using a username with no password.
@@ -146,4 +196,12 @@ public final class PostgreSQLConnection {
146
196
channel. close ( promise: nil )
147
197
}
148
198
}
199
+
200
+ /// Called when this class deinitializes.
201
+ deinit {
202
+ close ( )
203
+ }
204
+
149
205
}
206
+
207
+ private let closeError = PostgreSQLError ( identifier: " closed " , reason: " Connection is closed. " , source: . capture( ) )
0 commit comments