@@ -3,11 +3,11 @@ import Async
3
3
/// A PostgreSQL frontend client.
4
4
final class PostgreSQLClient {
5
5
/// Handles enqueued redis commands and responses.
6
- private let queueStream : QueueStream < PostgreSQLMessage , PostgreSQLMessage >
6
+ private let queueStream : AsymmetricQueueStream < PostgreSQLMessage , PostgreSQLMessage >
7
7
8
8
/// Creates a new Redis client on the provided data source and sink.
9
9
init < Stream> ( stream: Stream , on worker: Worker ) where Stream: ByteStream {
10
- let queueStream = QueueStream < PostgreSQLMessage , PostgreSQLMessage > ( )
10
+ let queueStream = AsymmetricQueueStream < PostgreSQLMessage , PostgreSQLMessage > ( )
11
11
12
12
let serializerStream = PostgreSQLMessageSerializer ( ) . stream ( on: worker)
13
13
let parserStream = PostgreSQLMessageParser ( ) . stream ( on: worker)
@@ -20,8 +20,134 @@ final class PostgreSQLClient {
20
20
self . queueStream = queueStream
21
21
}
22
22
23
- /// Sends `RedisData` to the server.
24
- func send( _ data: PostgreSQLMessage ) -> Future < PostgreSQLMessage > {
25
- return queueStream. enqueue ( data)
23
+ /// Sends `PostgreSQLMessage` to the server.
24
+ func send( _ message: PostgreSQLMessage ) -> Future < [ PostgreSQLMessage ] > {
25
+ return queueStream. enqueue ( message) { message in
26
+ switch message {
27
+ case . readyForQuery: return true
28
+ case . errorResponse( let e) : throw e
29
+ default : return false
30
+ }
31
+ }
32
+ }
33
+ }
34
+
35
+ /// Enqueues a single input and waits for multiple output.
36
+ /// This is useful for situations where one request can lead
37
+ /// to multiple responses.
38
+ public final class AsymmetricQueueStream < I, O> : Stream , ConnectionContext {
39
+ /// See `InputStream.Input`
40
+ public typealias Input = I
41
+
42
+ /// See `OutputStream.Output`
43
+ public typealias Output = O
44
+
45
+ /// Current upstream output stream.
46
+ private var upstream : ConnectionContext ?
47
+
48
+ /// Current downstrema input stream.
49
+ private var downstream : AnyInputStream < Output > ?
50
+
51
+ /// Current downstream demand.
52
+ private var downstreamDemand : UInt
53
+
54
+ /// Queued output.
55
+ private var queuedOutput : [ Output ]
56
+
57
+ /// Queued input.
58
+ private var queuedInput : [ AsymmetricQueueStreamInput < Input > ]
59
+
60
+ /// Current input being handled.
61
+ private var currentInput : AsymmetricQueueStreamInput < Input > ?
62
+
63
+ /// Create a new `AsymmetricQueueStream`.
64
+ public init ( ) {
65
+ self . downstreamDemand = 0
66
+ self . queuedOutput = [ ]
67
+ self . queuedInput = [ ]
68
+ }
69
+
70
+ /// Enqueue the supplied output, specifying a closure that will determine
71
+ /// when the Input received is ready.
72
+ public func enqueue( _ output: Output , readyCheck: @escaping ( Input ) throws -> Bool ) -> Future < [ Input ] > {
73
+ let input = AsymmetricQueueStreamInput ( readyCheck: readyCheck)
74
+ self . queuedInput. insert ( input, at: 0 )
75
+ self . queuedOutput. insert ( output, at: 0 )
76
+ upstream!. request ( )
77
+ update ( )
78
+ return input. promise. future
79
+ }
80
+
81
+ /// Updates internal state.
82
+ private func update( ) {
83
+ while downstreamDemand > 0 {
84
+ guard let output = queuedOutput. popLast ( ) else {
85
+ break
86
+ }
87
+ downstreamDemand -= 1
88
+ downstream!. next ( output)
89
+ }
90
+ }
91
+
92
+ /// See `ConnectionContext.connection`
93
+ public func connection( _ event: ConnectionEvent ) {
94
+ switch event {
95
+ case . cancel: break // handle better
96
+ case . request( let count) :
97
+ downstreamDemand += count
98
+ update ( )
99
+ }
100
+ }
101
+
102
+ /// See `InputStream.input`
103
+ public func input( _ event: InputEvent < I > ) {
104
+ switch event {
105
+ case . close: downstream? . close ( )
106
+ case . connect( let upstream) :
107
+ self . upstream = upstream
108
+ update ( )
109
+ case . error( let error) : downstream? . error ( error)
110
+ case . next( let input) :
111
+ var context : AsymmetricQueueStreamInput < Input >
112
+ if let current = currentInput {
113
+ context = current
114
+ } else {
115
+ print ( " extra: \( input) " )
116
+ let next = queuedInput. popLast ( ) !
117
+ currentInput = next
118
+ context = next
119
+ }
120
+
121
+ context. storage. append ( input)
122
+ do {
123
+ if try context. readyCheck ( input) {
124
+ context. promise. complete ( context. storage)
125
+ currentInput = nil
126
+ } else {
127
+ upstream!. request ( count: 1 )
128
+ }
129
+ } catch {
130
+ context. promise. fail ( error)
131
+ currentInput = nil
132
+ }
133
+ }
134
+ }
135
+
136
+ /// See `OutputStream.output`
137
+ public func output< S> ( to inputStream: S ) where S : InputStream , S. Input == Output {
138
+ downstream = . init( inputStream)
139
+ inputStream. connect ( to: self )
140
+ }
141
+ }
142
+
143
+ final class AsymmetricQueueStreamInput < Input> {
144
+ var storage : [ Input ]
145
+ var promise : Promise < [ Input ] >
146
+ var readyCheck : ( Input ) throws -> Bool
147
+
148
+ init ( readyCheck: @escaping ( Input ) throws -> Bool ) {
149
+ self . storage = [ ]
150
+ self . promise = . init( )
151
+ self . readyCheck = readyCheck
26
152
}
27
153
}
0 commit comments