@@ -23,9 +23,7 @@ class PostgreSQLClientTests: XCTestCase {
23
23
let parserStream = PostgreSQLMessageParser ( ) . stream ( on: eventLoop)
24
24
let serializerStream = PostgreSQLMessageSerializer ( ) . stream ( on: eventLoop)
25
25
26
- let startup = PostgreSQLStartupMessage . versionThree ( parameters: [ " user " : " postgres " ] )
27
- let requests = StaticStream < PostgreSQLMessage > ( data: [ . startupMessage( startup) ] )
28
-
26
+ let requests = PushStream < PostgreSQLMessage > ( )
29
27
requests. stream ( to: serializerStream)
30
28
. output ( to: byteStream)
31
29
@@ -44,6 +42,9 @@ class PostgreSQLClientTests: XCTestCase {
44
42
print ( " Closed " )
45
43
} . upstream!. request ( count: . max)
46
44
45
+ let startup = PostgreSQLStartupMessage . versionThree ( parameters: [ " user " : " postgres " ] )
46
+ requests. push ( . startupMessage( startup) )
47
+
47
48
_ = try promise. future. await ( on: eventLoop)
48
49
}
49
50
@@ -52,37 +53,3 @@ class PostgreSQLClientTests: XCTestCase {
52
53
( " testStreaming " , testStreaming) ,
53
54
]
54
55
}
55
-
56
- public final class StaticStream < O> : Async . OutputStream , ConnectionContext {
57
- public typealias Output = O
58
-
59
- public var downstream : AnyInputStream < Output > ?
60
- public var data : [ Output ]
61
-
62
- public init ( data: [ Output ] ) {
63
- self . data = data. reversed ( )
64
- }
65
-
66
- public func connection( _ event: ConnectionEvent ) {
67
- switch event {
68
- case . cancel:
69
- data = [ ]
70
- case . request( var count) :
71
- stream: while count > 0 {
72
- count -= 1
73
- if let data = self . data. popLast ( ) {
74
- downstream!. next ( data)
75
- } else {
76
- // out of data
77
- break stream
78
- }
79
- }
80
- }
81
- }
82
-
83
- public func output< S> ( to inputStream: S ) where S: Async . InputStream , S. Input == Output {
84
- self . downstream = . init( inputStream)
85
- inputStream. connect ( to: self )
86
- }
87
-
88
- }
0 commit comments